diff --git a/runtime/drivers/athena/information_schema.go b/runtime/drivers/athena/information_schema.go index 053f053e6799..86491edb6417 100644 --- a/runtime/drivers/athena/information_schema.go +++ b/runtime/drivers/athena/information_schema.go @@ -12,6 +12,7 @@ import ( "github.com/aws/aws-sdk-go-v2/service/athena" "github.com/aws/aws-sdk-go-v2/service/athena/types" "github.com/aws/smithy-go" + runtimev1 "github.com/rilldata/rill/proto/gen/rill/runtime/v1" "github.com/rilldata/rill/runtime/drivers" "github.com/rilldata/rill/runtime/pkg/pagination" "golang.org/x/sync/errgroup" @@ -180,6 +181,47 @@ func (c *Connection) listCatalogs(ctx context.Context, client *athena.Client) ([ return catalogs, nil } +// All implements drivers.OLAPInformationSchema. +func (c *Connection) All(ctx context.Context, like string, pageSize uint32, pageToken string) ([]*drivers.OlapTable, string, error) { + return drivers.AllFromInformationSchema(ctx, like, pageSize, pageToken, c) +} + +// LoadPhysicalSize implements drivers.OLAPInformationSchema. +func (c *Connection) LoadPhysicalSize(ctx context.Context, tables []*drivers.OlapTable) error { + return nil +} + +// LoadDDL implements drivers.OLAPInformationSchema. +func (c *Connection) LoadDDL(ctx context.Context, table *drivers.OlapTable) error { + return nil // Not implemented +} + +// Lookup implements drivers.OLAPInformationSchema. +func (c *Connection) Lookup(ctx context.Context, db, schema, name string) (*drivers.OlapTable, error) { + meta, err := c.GetTable(ctx, db, schema, name) + if err != nil { + return nil, err + } + runtimeSchema := &runtimev1.StructType{ + Fields: make([]*runtimev1.StructType_Field, 0, len(meta.Schema)), + } + for name, typ := range meta.Schema { + runtimeSchema.Fields = append(runtimeSchema.Fields, &runtimev1.StructType_Field{ + Name: name, + Type: athenaTypeToRuntimeType(typ), + }) + } + return &drivers.OlapTable{ + Database: db, + DatabaseSchema: schema, + Name: name, + View: meta.View, + Schema: runtimeSchema, + UnsupportedCols: nil, + PhysicalSizeBytes: 0, + }, nil +} + func (c *Connection) listSchemasForCatalog(ctx context.Context, client *athena.Client, catalog string) ([]*drivers.DatabaseSchemaInfo, error) { // Use catalog if specified var q string diff --git a/runtime/drivers/athena/olap.go b/runtime/drivers/athena/olap.go index aaf8f8df421c..3ce0698ccaef 100644 --- a/runtime/drivers/athena/olap.go +++ b/runtime/drivers/athena/olap.go @@ -33,7 +33,7 @@ func (c *Connection) Exec(ctx context.Context, stmt *drivers.Statement) error { } // InformationSchema implements drivers.OLAPStore. -func (c *Connection) InformationSchema() drivers.OLAPInformationSchema { +func (c *Connection) InformationSchema() drivers.InformationSchema { return c } @@ -108,47 +108,6 @@ func (c *Connection) WithConnection(ctx context.Context, priority int, fn driver return drivers.ErrNotImplemented } -// All implements drivers.OLAPInformationSchema. -func (c *Connection) All(ctx context.Context, like string, pageSize uint32, pageToken string) ([]*drivers.OlapTable, string, error) { - return drivers.AllFromInformationSchema(ctx, like, pageSize, pageToken, c) -} - -// LoadPhysicalSize implements drivers.OLAPInformationSchema. -func (c *Connection) LoadPhysicalSize(ctx context.Context, tables []*drivers.OlapTable) error { - return nil -} - -// LoadDDL implements drivers.OLAPInformationSchema. -func (c *Connection) LoadDDL(ctx context.Context, table *drivers.OlapTable) error { - return nil // Not implemented -} - -// Lookup implements drivers.OLAPInformationSchema. -func (c *Connection) Lookup(ctx context.Context, db, schema, name string) (*drivers.OlapTable, error) { - meta, err := c.GetTable(ctx, db, schema, name) - if err != nil { - return nil, err - } - runtimeSchema := &runtimev1.StructType{ - Fields: make([]*runtimev1.StructType_Field, 0, len(meta.Schema)), - } - for name, typ := range meta.Schema { - runtimeSchema.Fields = append(runtimeSchema.Fields, &runtimev1.StructType_Field{ - Name: name, - Type: athenaTypeToRuntimeType(typ), - }) - } - return &drivers.OlapTable{ - Database: db, - DatabaseSchema: schema, - Name: name, - View: meta.View, - Schema: runtimeSchema, - UnsupportedCols: nil, - PhysicalSizeBytes: 0, - }, nil -} - type rows struct { queryID string client *athena.Client diff --git a/runtime/drivers/bigquery/information_schema.go b/runtime/drivers/bigquery/information_schema.go index 6ac1ee0656bf..657904074680 100644 --- a/runtime/drivers/bigquery/information_schema.go +++ b/runtime/drivers/bigquery/information_schema.go @@ -173,3 +173,76 @@ func (c *Connection) GetTable(ctx context.Context, database, databaseSchema, tab return r, nil } + +// All implements drivers.OLAPInformationSchema. +func (c *Connection) All(ctx context.Context, like string, pageSize uint32, pageToken string) ([]*drivers.OlapTable, string, error) { + return drivers.AllFromInformationSchema(ctx, like, pageSize, pageToken, c) +} + +// LoadPhysicalSize implements drivers.OLAPInformationSchema. +func (c *Connection) LoadPhysicalSize(ctx context.Context, tables []*drivers.OlapTable) error { + return nil +} + +// LoadDDL implements drivers.OLAPInformationSchema. +func (c *Connection) LoadDDL(ctx context.Context, table *drivers.OlapTable) error { + client, err := c.getClient(ctx) + if err != nil { + return err + } + + q := fmt.Sprintf("SELECT ddl FROM `%s.%s.INFORMATION_SCHEMA.TABLES` WHERE table_name = @name", table.Database, table.DatabaseSchema) + cq := client.Query(q) + cq.Parameters = []bigquery.QueryParameter{ + {Name: "name", Value: table.Name}, + } + + it, err := cq.Read(ctx) + if err != nil { + return err + } + + var row struct { + DDL string `bigquery:"ddl"` + } + err = it.Next(&row) + if err != nil { + return err + } + table.DDL = row.DDL + return nil +} + +// Lookup implements drivers.OLAPInformationSchema. +func (c *Connection) Lookup(ctx context.Context, db, schema, name string) (*drivers.OlapTable, error) { + client, err := c.getClient(ctx) + if err != nil { + return nil, fmt.Errorf("failed to get BigQuery client: %w", err) + } + + var table *bigquery.Table + if db != "" { + table = client.DatasetInProject(db, schema).Table(name) + } else { + table = client.Dataset(schema).Table(name) + } + + meta, err := table.Metadata(ctx) + if err != nil { + return nil, fmt.Errorf("failed to get table metadata: %w", err) + } + runtimeSchema, err := fromBQSchema(meta.Schema) + if err != nil { + return nil, err + } + tbl := &drivers.OlapTable{ + Database: db, + DatabaseSchema: schema, + Name: name, + View: meta.Type == bigquery.ViewTable, + Schema: runtimeSchema, + UnsupportedCols: nil, // all columns are currently being mapped though may not be as specific as in BigQuery + PhysicalSizeBytes: 0, + } + return tbl, nil +} diff --git a/runtime/drivers/bigquery/olap.go b/runtime/drivers/bigquery/olap.go index 45a7a0d99e49..e32a23dc33aa 100644 --- a/runtime/drivers/bigquery/olap.go +++ b/runtime/drivers/bigquery/olap.go @@ -42,7 +42,7 @@ func (c *Connection) Exec(ctx context.Context, stmt *drivers.Statement) error { } // InformationSchema implements drivers.OLAPStore. -func (c *Connection) InformationSchema() drivers.OLAPInformationSchema { +func (c *Connection) InformationSchema() drivers.InformationSchema { return c } @@ -146,79 +146,6 @@ func (c *Connection) WithConnection(ctx context.Context, priority int, fn driver return drivers.ErrNotImplemented } -// All implements drivers.OLAPInformationSchema. -func (c *Connection) All(ctx context.Context, like string, pageSize uint32, pageToken string) ([]*drivers.OlapTable, string, error) { - return drivers.AllFromInformationSchema(ctx, like, pageSize, pageToken, c) -} - -// LoadPhysicalSize implements drivers.OLAPInformationSchema. -func (c *Connection) LoadPhysicalSize(ctx context.Context, tables []*drivers.OlapTable) error { - return nil -} - -// LoadDDL implements drivers.OLAPInformationSchema. -func (c *Connection) LoadDDL(ctx context.Context, table *drivers.OlapTable) error { - client, err := c.getClient(ctx) - if err != nil { - return err - } - - q := fmt.Sprintf("SELECT ddl FROM `%s.%s.INFORMATION_SCHEMA.TABLES` WHERE table_name = @name", table.Database, table.DatabaseSchema) - cq := client.Query(q) - cq.Parameters = []bigquery.QueryParameter{ - {Name: "name", Value: table.Name}, - } - - it, err := cq.Read(ctx) - if err != nil { - return err - } - - var row struct { - DDL string `bigquery:"ddl"` - } - err = it.Next(&row) - if err != nil { - return err - } - table.DDL = row.DDL - return nil -} - -// Lookup implements drivers.OLAPInformationSchema. -func (c *Connection) Lookup(ctx context.Context, db, schema, name string) (*drivers.OlapTable, error) { - client, err := c.getClient(ctx) - if err != nil { - return nil, fmt.Errorf("failed to get BigQuery client: %w", err) - } - - var table *bigquery.Table - if db != "" { - table = client.DatasetInProject(db, schema).Table(name) - } else { - table = client.Dataset(schema).Table(name) - } - - meta, err := table.Metadata(ctx) - if err != nil { - return nil, fmt.Errorf("failed to get table metadata: %w", err) - } - runtimeSchema, err := fromBQSchema(meta.Schema) - if err != nil { - return nil, err - } - tbl := &drivers.OlapTable{ - Database: db, - DatabaseSchema: schema, - Name: name, - View: meta.Type == bigquery.ViewTable, - Schema: runtimeSchema, - UnsupportedCols: nil, // all columns are currently being mapped though may not be as specific as in BigQuery - PhysicalSizeBytes: 0, - } - return tbl, nil -} - func (c *Connection) Head(ctx context.Context, db, schema, table string, limit int64) (*drivers.Result, error) { client, err := c.getClient(ctx) if err != nil { diff --git a/runtime/drivers/clickhouse/olap.go b/runtime/drivers/clickhouse/olap.go index 4cd597469add..042b293bbcb8 100644 --- a/runtime/drivers/clickhouse/olap.go +++ b/runtime/drivers/clickhouse/olap.go @@ -345,7 +345,7 @@ func (c *Connection) QuerySchema(ctx context.Context, query string, args []any) return schema, nil } -func (c *Connection) InformationSchema() drivers.OLAPInformationSchema { +func (c *Connection) InformationSchema() drivers.InformationSchema { return c } diff --git a/runtime/drivers/databricks/information_schema.go b/runtime/drivers/databricks/information_schema.go index bc49885d7b9f..2ca6fc28a919 100644 --- a/runtime/drivers/databricks/information_schema.go +++ b/runtime/drivers/databricks/information_schema.go @@ -4,6 +4,7 @@ import ( "context" "fmt" + runtimev1 "github.com/rilldata/rill/proto/gen/rill/runtime/v1" "github.com/rilldata/rill/runtime/drivers" "github.com/rilldata/rill/runtime/pkg/pagination" ) @@ -178,6 +179,90 @@ func (c *connection) GetTable(ctx context.Context, database, databaseSchema, tab return t, nil } +// All implements drivers.OLAPInformationSchema. +func (c *connection) All(ctx context.Context, like string, pageSize uint32, pageToken string) ([]*drivers.OlapTable, string, error) { + return drivers.AllFromInformationSchema(ctx, like, pageSize, pageToken, c) +} + +// LoadPhysicalSize implements drivers.OLAPInformationSchema. +func (c *connection) LoadPhysicalSize(ctx context.Context, tables []*drivers.OlapTable) error { + return nil +} + +// LoadDDL implements drivers.OLAPInformationSchema. +func (c *connection) LoadDDL(ctx context.Context, table *drivers.OlapTable) error { + db, err := c.getDB(ctx) + if err != nil { + return err + } + + fqn := DialectDatabricks.EscapeTable(table.Database, table.DatabaseSchema, table.Name) + + objectType := "TABLE" + if table.View { + objectType = "VIEW" + } + + var ddl string + err = db.QueryRowContext(ctx, fmt.Sprintf("SHOW CREATE %s %s", objectType, fqn)).Scan(&ddl) + if err != nil { + return err + } + table.DDL = ddl + return nil +} + +// Lookup implements drivers.OLAPInformationSchema. +func (c *connection) Lookup(ctx context.Context, database, schema, name string) (*drivers.OlapTable, error) { + prefix := catalogPrefix(database) + q := fmt.Sprintf(` + SELECT + CASE WHEN t.table_type = 'VIEW' THEN true ELSE false END AS is_view, + c.column_name, + c.data_type + FROM %sinformation_schema.tables t + JOIN %sinformation_schema.columns c + ON t.table_schema = c.table_schema AND t.table_name = c.table_name + WHERE t.table_schema = ? AND t.table_name = ? + ORDER BY c.ordinal_position + `, prefix, prefix) + + conn, err := c.getDB(ctx) + if err != nil { + return nil, err + } + + rows, err := conn.QueryContext(ctx, q, schema, name) + if err != nil { + return nil, err + } + defer rows.Close() + + var isView bool + var fields []*runtimev1.StructType_Field + var colName, colType string + for rows.Next() { + if err := rows.Scan(&isView, &colName, &colType); err != nil { + return nil, err + } + fields = append(fields, &runtimev1.StructType_Field{ + Name: colName, + Type: databaseTypeToPB(colType), + }) + } + if err := rows.Err(); err != nil { + return nil, err + } + + return &drivers.OlapTable{ + Database: database, + DatabaseSchema: schema, + Name: name, + View: isView, + Schema: &runtimev1.StructType{Fields: fields}, + }, nil +} + // catalogPrefix returns "." if catalog is non-empty, or "" otherwise. func catalogPrefix(catalog string) string { if catalog == "" { diff --git a/runtime/drivers/databricks/olap.go b/runtime/drivers/databricks/olap.go index 83e4dfb427c9..bb9904eae107 100644 --- a/runtime/drivers/databricks/olap.go +++ b/runtime/drivers/databricks/olap.go @@ -32,7 +32,7 @@ func (c *connection) Exec(ctx context.Context, stmt *drivers.Statement) error { } // InformationSchema implements drivers.OLAPStore. -func (c *connection) InformationSchema() drivers.OLAPInformationSchema { +func (c *connection) InformationSchema() drivers.InformationSchema { return c } @@ -103,90 +103,6 @@ func (c *connection) WithConnection(ctx context.Context, priority int, fn driver return drivers.ErrNotImplemented } -// All implements drivers.OLAPInformationSchema. -func (c *connection) All(ctx context.Context, like string, pageSize uint32, pageToken string) ([]*drivers.OlapTable, string, error) { - return drivers.AllFromInformationSchema(ctx, like, pageSize, pageToken, c) -} - -// LoadPhysicalSize implements drivers.OLAPInformationSchema. -func (c *connection) LoadPhysicalSize(ctx context.Context, tables []*drivers.OlapTable) error { - return nil -} - -// LoadDDL implements drivers.OLAPInformationSchema. -func (c *connection) LoadDDL(ctx context.Context, table *drivers.OlapTable) error { - db, err := c.getDB(ctx) - if err != nil { - return err - } - - fqn := DialectDatabricks.EscapeTable(table.Database, table.DatabaseSchema, table.Name) - - objectType := "TABLE" - if table.View { - objectType = "VIEW" - } - - var ddl string - err = db.QueryRowContext(ctx, fmt.Sprintf("SHOW CREATE %s %s", objectType, fqn)).Scan(&ddl) - if err != nil { - return err - } - table.DDL = ddl - return nil -} - -// Lookup implements drivers.OLAPInformationSchema. -func (c *connection) Lookup(ctx context.Context, database, schema, name string) (*drivers.OlapTable, error) { - prefix := catalogPrefix(database) - q := fmt.Sprintf(` - SELECT - CASE WHEN t.table_type = 'VIEW' THEN true ELSE false END AS is_view, - c.column_name, - c.data_type - FROM %sinformation_schema.tables t - JOIN %sinformation_schema.columns c - ON t.table_schema = c.table_schema AND t.table_name = c.table_name - WHERE t.table_schema = ? AND t.table_name = ? - ORDER BY c.ordinal_position - `, prefix, prefix) - - conn, err := c.getDB(ctx) - if err != nil { - return nil, err - } - - rows, err := conn.QueryContext(ctx, q, schema, name) - if err != nil { - return nil, err - } - defer rows.Close() - - var isView bool - var fields []*runtimev1.StructType_Field - var colName, colType string - for rows.Next() { - if err := rows.Scan(&isView, &colName, &colType); err != nil { - return nil, err - } - fields = append(fields, &runtimev1.StructType_Field{ - Name: colName, - Type: databaseTypeToPB(colType), - }) - } - if err := rows.Err(); err != nil { - return nil, err - } - - return &drivers.OlapTable{ - Database: database, - DatabaseSchema: schema, - Name: name, - View: isView, - Schema: &runtimev1.StructType{Fields: fields}, - }, nil -} - // Head implements drivers.OLAPStore. func (c *connection) Head(ctx context.Context, db, schema, table string, limit int64) (*drivers.Result, error) { tbl, err := c.InformationSchema().Lookup(ctx, db, schema, table) diff --git a/runtime/drivers/druid/olap.go b/runtime/drivers/druid/olap.go index 2ce7e016e717..301d8aeade97 100644 --- a/runtime/drivers/druid/olap.go +++ b/runtime/drivers/druid/olap.go @@ -193,7 +193,7 @@ func (c *connection) QuerySchema(ctx context.Context, query string, args []any) return res.Schema, nil } -func (c *connection) InformationSchema() drivers.OLAPInformationSchema { +func (c *connection) InformationSchema() drivers.InformationSchema { return c } diff --git a/runtime/drivers/duckdb/olap.go b/runtime/drivers/duckdb/olap.go index e75c2142c5ea..70b927d38521 100644 --- a/runtime/drivers/duckdb/olap.go +++ b/runtime/drivers/duckdb/olap.go @@ -245,7 +245,7 @@ func (c *connection) QuerySchema(ctx context.Context, query string, args []any) return res.Schema, nil } -func (c *connection) InformationSchema() drivers.OLAPInformationSchema { +func (c *connection) InformationSchema() drivers.InformationSchema { return c } diff --git a/runtime/drivers/information_schema.go b/runtime/drivers/information_schema.go index ed5408bfc075..03cc70a531d5 100644 --- a/runtime/drivers/information_schema.go +++ b/runtime/drivers/information_schema.go @@ -3,9 +3,22 @@ package drivers import ( "context" "fmt" + + runtimev1 "github.com/rilldata/rill/proto/gen/rill/runtime/v1" ) type InformationSchema interface { + // All returns metadata about all tables and views. + // The like argument can optionally be passed to filter the tables by name. + All(ctx context.Context, like string, pageSize uint32, pageToken string) ([]*OlapTable, string, error) + // Lookup returns metadata about a specific tables and views. + Lookup(ctx context.Context, db, schema, name string) (*OlapTable, error) + // LoadPhysicalSize populates the PhysicalSizeBytes field of table metadata. + // It should be called aft`er All or Lookup and not on manually created tables. + LoadPhysicalSize(ctx context.Context, tables []*OlapTable) error + // LoadDDL populates the DDL field of a single table's metadata. + // Drivers that don't support DDL retrieval should return nil (leaving DDL empty). + LoadDDL(ctx context.Context, table *OlapTable) error // ListDatabaseSchemas returns all schemas across databases ListDatabaseSchemas(ctx context.Context, pageSize uint32, pageToken string) ([]*DatabaseSchemaInfo, string, error) // ListTables returns all tables in a schema. @@ -14,6 +27,21 @@ type InformationSchema interface { GetTable(ctx context.Context, database, databaseSchema, table string) (*TableMetadata, error) } +// OlapTable represents a table in an information schema. +type OlapTable struct { + Database string + DatabaseSchema string + IsDefaultDatabase bool + IsDefaultDatabaseSchema bool + Name string + View bool + // Schema is the table schema. It is only set when only single table is looked up. It is not set when listing all tables. + Schema *runtimev1.StructType + UnsupportedCols map[string]string + PhysicalSizeBytes int64 + DDL string +} + const ( // DefaultPageSize is the default page size used when pageSize is not defined DefaultPageSize = 100 diff --git a/runtime/drivers/mysql/information_schema.go b/runtime/drivers/mysql/information_schema.go index 56ab205655b1..c9f631837b83 100644 --- a/runtime/drivers/mysql/information_schema.go +++ b/runtime/drivers/mysql/information_schema.go @@ -4,6 +4,7 @@ import ( "context" "fmt" + runtimev1 "github.com/rilldata/rill/proto/gen/rill/runtime/v1" "github.com/rilldata/rill/runtime/drivers" "github.com/rilldata/rill/runtime/pkg/pagination" ) @@ -169,3 +170,72 @@ func (c *connection) GetTable(ctx context.Context, database, databaseSchema, tab return res, nil } + +// All implements drivers.OLAPInformationSchema. +func (c *connection) All(ctx context.Context, like string, pageSize uint32, pageToken string) ([]*drivers.OlapTable, string, error) { + return drivers.AllFromInformationSchema(ctx, like, pageSize, pageToken, c) +} + +// LoadPhysicalSize implements drivers.OLAPInformationSchema. +func (c *connection) LoadPhysicalSize(ctx context.Context, tables []*drivers.OlapTable) error { + return nil +} + +// LoadDDL implements drivers.OLAPInformationSchema. +func (c *connection) LoadDDL(ctx context.Context, table *drivers.OlapTable) error { + db, err := c.getDB(ctx) + if err != nil { + return err + } + + // SHOW CREATE TABLE works for both tables and views in MySQL. + // For tables it returns columns: [Table, Create Table]. + // For views it returns columns: [View, Create View, character_set_client, collation_connection]. + // We extract the DDL by column name to avoid depending on column order or count. + rows, err := db.QueryxContext(ctx, fmt.Sprintf("SHOW CREATE TABLE %s", c.Dialect().EscapeTable(table.Database, table.DatabaseSchema, table.Name))) + if err != nil { + return err + } + defer rows.Close() + + if rows.Next() { + res := make(map[string]any) + if err := rows.MapScan(res); err != nil { + return err + } + for _, key := range []string{"Create Table", "Create View"} { + if v, ok := res[key]; ok && v != nil { + if b, ok := v.([]byte); ok { + table.DDL = string(b) + } + break + } + } + } + return rows.Err() +} + +// Lookup implements drivers.OLAPInformationSchema. +func (c *connection) Lookup(ctx context.Context, db, schema, name string) (*drivers.OlapTable, error) { + meta, err := c.GetTable(ctx, db, schema, name) + if err != nil { + return nil, err + } + + rtSchema := &runtimev1.StructType{} + for name, typ := range meta.Schema { + rtSchema.Fields = append(rtSchema.Fields, &runtimev1.StructType_Field{ + Name: name, + Type: databaseTypeToPB(typ, true), + }) + } + return &drivers.OlapTable{ + Database: db, + DatabaseSchema: schema, + Name: name, + View: meta.View, + Schema: rtSchema, + UnsupportedCols: nil, + PhysicalSizeBytes: 0, + }, nil +} diff --git a/runtime/drivers/mysql/olap.go b/runtime/drivers/mysql/olap.go index 6bf59acf83da..82d9fde423d4 100644 --- a/runtime/drivers/mysql/olap.go +++ b/runtime/drivers/mysql/olap.go @@ -37,7 +37,7 @@ func (c *connection) Exec(ctx context.Context, stmt *drivers.Statement) error { } // InformationSchema implements drivers.OLAPStore. -func (c *connection) InformationSchema() drivers.OLAPInformationSchema { +func (c *connection) InformationSchema() drivers.InformationSchema { return c } @@ -135,75 +135,6 @@ func (c *connection) WithConnection(ctx context.Context, priority int, fn driver return drivers.ErrNotImplemented } -// All implements drivers.OLAPInformationSchema. -func (c *connection) All(ctx context.Context, like string, pageSize uint32, pageToken string) ([]*drivers.OlapTable, string, error) { - return drivers.AllFromInformationSchema(ctx, like, pageSize, pageToken, c) -} - -// LoadPhysicalSize implements drivers.OLAPInformationSchema. -func (c *connection) LoadPhysicalSize(ctx context.Context, tables []*drivers.OlapTable) error { - return nil -} - -// LoadDDL implements drivers.OLAPInformationSchema. -func (c *connection) LoadDDL(ctx context.Context, table *drivers.OlapTable) error { - db, err := c.getDB(ctx) - if err != nil { - return err - } - - // SHOW CREATE TABLE works for both tables and views in MySQL. - // For tables it returns columns: [Table, Create Table]. - // For views it returns columns: [View, Create View, character_set_client, collation_connection]. - // We extract the DDL by column name to avoid depending on column order or count. - rows, err := db.QueryxContext(ctx, fmt.Sprintf("SHOW CREATE TABLE %s", c.Dialect().EscapeTable(table.Database, table.DatabaseSchema, table.Name))) - if err != nil { - return err - } - defer rows.Close() - - if rows.Next() { - res := make(map[string]any) - if err := rows.MapScan(res); err != nil { - return err - } - for _, key := range []string{"Create Table", "Create View"} { - if v, ok := res[key]; ok && v != nil { - if b, ok := v.([]byte); ok { - table.DDL = string(b) - } - break - } - } - } - return rows.Err() -} - -// Lookup implements drivers.OLAPInformationSchema. -func (c *connection) Lookup(ctx context.Context, db, schema, name string) (*drivers.OlapTable, error) { - meta, err := c.GetTable(ctx, db, schema, name) - if err != nil { - return nil, err - } - - rtSchema := &runtimev1.StructType{} - for name, typ := range meta.Schema { - rtSchema.Fields = append(rtSchema.Fields, &runtimev1.StructType_Field{ - Name: name, - Type: databaseTypeToPB(typ, true), - }) - } - return &drivers.OlapTable{ - Database: db, - DatabaseSchema: schema, - Name: name, - View: meta.View, - Schema: rtSchema, - UnsupportedCols: nil, - PhysicalSizeBytes: 0, - }, nil -} - func rowsToSchema(r *sqlx.Rows) (*runtimev1.StructType, error) { if r == nil { return nil, nil diff --git a/runtime/drivers/olap.go b/runtime/drivers/olap.go index 5c51ae75c0f8..e7a528b8c8da 100644 --- a/runtime/drivers/olap.go +++ b/runtime/drivers/olap.go @@ -47,7 +47,7 @@ type OLAPStore interface { // QuerySchema returns the schema of the sql without trying not to run the actual query. QuerySchema(ctx context.Context, query string, args []any) (*runtimev1.StructType, error) // InformationSchema enables introspecting the tables and views available in the OLAP driver. - InformationSchema() OLAPInformationSchema + InformationSchema() InformationSchema // EstimateSize returns an estimate of the total data size in bytes. // Returns -1 if size estimation is not supported by the driver. EstimateSize(ctx context.Context) (int64, error) @@ -164,34 +164,3 @@ func (r *Result) Close() error { } return firstErr } - -// OLAPInformationSchema contains information about existing tables in an OLAP driver. -// Table lookups should be case insensitive. -type OLAPInformationSchema interface { - // All returns metadata about all tables and views. - // The like argument can optionally be passed to filter the tables by name. - All(ctx context.Context, like string, pageSize uint32, pageToken string) ([]*OlapTable, string, error) - // Lookup returns metadata about a specific tables and views. - Lookup(ctx context.Context, db, schema, name string) (*OlapTable, error) - // LoadPhysicalSize populates the PhysicalSizeBytes field of table metadata. - // It should be called after All or Lookup and not on manually created tables. - LoadPhysicalSize(ctx context.Context, tables []*OlapTable) error - // LoadDDL populates the DDL field of a single table's metadata. - // Drivers that don't support DDL retrieval should return nil (leaving DDL empty). - LoadDDL(ctx context.Context, table *OlapTable) error -} - -// OlapTable represents a table in an information schema. -type OlapTable struct { - Database string - DatabaseSchema string - IsDefaultDatabase bool - IsDefaultDatabaseSchema bool - Name string - View bool - // Schema is the table schema. It is only set when only single table is looked up. It is not set when listing all tables. - Schema *runtimev1.StructType - UnsupportedCols map[string]string - PhysicalSizeBytes int64 - DDL string -} diff --git a/runtime/drivers/pinot/olap.go b/runtime/drivers/pinot/olap.go index d8472087fbf3..4ff9b5f85769 100644 --- a/runtime/drivers/pinot/olap.go +++ b/runtime/drivers/pinot/olap.go @@ -154,7 +154,7 @@ func (c *connection) QuerySchema(ctx context.Context, query string, args []any) return res.Schema, nil } -func (c *connection) InformationSchema() drivers.OLAPInformationSchema { +func (c *connection) InformationSchema() drivers.InformationSchema { return c } diff --git a/runtime/drivers/postgres/information_schema.go b/runtime/drivers/postgres/information_schema.go index 4aee02f0c7c0..a2e0b9ad94dd 100644 --- a/runtime/drivers/postgres/information_schema.go +++ b/runtime/drivers/postgres/information_schema.go @@ -4,6 +4,7 @@ import ( "context" "fmt" + runtimev1 "github.com/rilldata/rill/proto/gen/rill/runtime/v1" "github.com/rilldata/rill/runtime/drivers" "github.com/rilldata/rill/runtime/pkg/pagination" ) @@ -169,3 +170,95 @@ func (c *connection) GetTable(ctx context.Context, database, databaseSchema, tab Schema: columns, }, rows.Err() } + +// All implements drivers.OLAPInformationSchema. +func (c *connection) All(ctx context.Context, like string, pageSize uint32, pageToken string) ([]*drivers.OlapTable, string, error) { + return drivers.AllFromInformationSchema(ctx, like, pageSize, pageToken, c) +} + +// LoadPhysicalSize implements drivers.OLAPInformationSchema. +func (c *connection) LoadPhysicalSize(ctx context.Context, tables []*drivers.OlapTable) error { + return nil +} + +// LoadDDL implements drivers.OLAPInformationSchema. +// Note: table.Database is not used; in Postgres, the database is determined by the connection. +func (c *connection) LoadDDL(ctx context.Context, table *drivers.OlapTable) error { + db, err := c.getDB(ctx) + if err != nil { + return err + } + + schema := table.DatabaseSchema + if schema == "" { + if err := db.QueryRowContext(ctx, "SELECT current_schema()").Scan(&schema); err != nil { + return err + } + } + + if table.View { + // For views: use pg_get_viewdef + var ddl string + q := ` + SELECT 'CREATE VIEW ' || quote_ident(n.nspname) || '.' || quote_ident(c.relname) || ' AS ' || pg_get_viewdef(c.oid, true) + FROM pg_class c + JOIN pg_namespace n ON n.oid = c.relnamespace + WHERE n.nspname = $1 AND c.relname = $2 AND c.relkind IN ('v', 'm') + ` + err = db.QueryRowContext(ctx, q, schema, table.Name).Scan(&ddl) + if err != nil { + return err + } + table.DDL = ddl + return nil + } + + // Postgres does not have a built-in way to get the DDL for a table, so we reconstruct a basic CREATE TABLE statement from the available metadata (won't include indexes, constraints, etc.). + q := ` + SELECT + 'CREATE TABLE ' || quote_ident(n.nspname) || '.' || quote_ident(c.relname) || ' (' || + string_agg( + quote_ident(a.attname) || ' ' || format_type(a.atttypid, a.atttypmod) || + CASE WHEN a.attnotnull THEN ' NOT NULL' ELSE '' END, + ', ' ORDER BY a.attnum + ) || ')' + FROM pg_class c + JOIN pg_namespace n ON n.oid = c.relnamespace + JOIN pg_attribute a ON a.attrelid = c.oid + WHERE n.nspname = $1 AND c.relname = $2 AND a.attnum > 0 AND NOT a.attisdropped + GROUP BY n.nspname, c.relname + ` + var ddl string + err = db.QueryRowContext(ctx, q, schema, table.Name).Scan(&ddl) + if err != nil { + return err + } + table.DDL = ddl + return nil +} + +// Lookup implements drivers.OLAPInformationSchema. +func (c *connection) Lookup(ctx context.Context, db, schema, name string) (*drivers.OlapTable, error) { + meta, err := c.GetTable(ctx, db, schema, name) + if err != nil { + return nil, err + } + + rtSchema := &runtimev1.StructType{} + for name, typ := range meta.Schema { + t := databaseTypeToPB(typ) + rtSchema.Fields = append(rtSchema.Fields, &runtimev1.StructType_Field{ + Name: name, + Type: t, + }) + } + return &drivers.OlapTable{ + Database: db, + DatabaseSchema: schema, + Name: name, + View: meta.View, + Schema: rtSchema, + UnsupportedCols: nil, + PhysicalSizeBytes: 0, + }, nil +} diff --git a/runtime/drivers/postgres/olap.go b/runtime/drivers/postgres/olap.go index 4f75da8c37e4..0bd9bfc94a72 100644 --- a/runtime/drivers/postgres/olap.go +++ b/runtime/drivers/postgres/olap.go @@ -32,7 +32,7 @@ func (c *connection) Exec(ctx context.Context, stmt *drivers.Statement) error { } // InformationSchema implements drivers.OLAPStore. -func (c *connection) InformationSchema() drivers.OLAPInformationSchema { +func (c *connection) InformationSchema() drivers.InformationSchema { return c } @@ -115,98 +115,6 @@ func (c *connection) WithConnection(ctx context.Context, priority int, fn driver return drivers.ErrNotImplemented } -// All implements drivers.OLAPInformationSchema. -func (c *connection) All(ctx context.Context, like string, pageSize uint32, pageToken string) ([]*drivers.OlapTable, string, error) { - return drivers.AllFromInformationSchema(ctx, like, pageSize, pageToken, c) -} - -// LoadPhysicalSize implements drivers.OLAPInformationSchema. -func (c *connection) LoadPhysicalSize(ctx context.Context, tables []*drivers.OlapTable) error { - return nil -} - -// LoadDDL implements drivers.OLAPInformationSchema. -// Note: table.Database is not used; in Postgres, the database is determined by the connection. -func (c *connection) LoadDDL(ctx context.Context, table *drivers.OlapTable) error { - db, err := c.getDB(ctx) - if err != nil { - return err - } - - schema := table.DatabaseSchema - if schema == "" { - if err := db.QueryRowContext(ctx, "SELECT current_schema()").Scan(&schema); err != nil { - return err - } - } - - if table.View { - // For views: use pg_get_viewdef - var ddl string - q := ` - SELECT 'CREATE VIEW ' || quote_ident(n.nspname) || '.' || quote_ident(c.relname) || ' AS ' || pg_get_viewdef(c.oid, true) - FROM pg_class c - JOIN pg_namespace n ON n.oid = c.relnamespace - WHERE n.nspname = $1 AND c.relname = $2 AND c.relkind IN ('v', 'm') - ` - err = db.QueryRowContext(ctx, q, schema, table.Name).Scan(&ddl) - if err != nil { - return err - } - table.DDL = ddl - return nil - } - - // Postgres does not have a built-in way to get the DDL for a table, so we reconstruct a basic CREATE TABLE statement from the available metadata (won't include indexes, constraints, etc.). - q := ` - SELECT - 'CREATE TABLE ' || quote_ident(n.nspname) || '.' || quote_ident(c.relname) || ' (' || - string_agg( - quote_ident(a.attname) || ' ' || format_type(a.atttypid, a.atttypmod) || - CASE WHEN a.attnotnull THEN ' NOT NULL' ELSE '' END, - ', ' ORDER BY a.attnum - ) || ')' - FROM pg_class c - JOIN pg_namespace n ON n.oid = c.relnamespace - JOIN pg_attribute a ON a.attrelid = c.oid - WHERE n.nspname = $1 AND c.relname = $2 AND a.attnum > 0 AND NOT a.attisdropped - GROUP BY n.nspname, c.relname - ` - var ddl string - err = db.QueryRowContext(ctx, q, schema, table.Name).Scan(&ddl) - if err != nil { - return err - } - table.DDL = ddl - return nil -} - -// Lookup implements drivers.OLAPInformationSchema. -func (c *connection) Lookup(ctx context.Context, db, schema, name string) (*drivers.OlapTable, error) { - meta, err := c.GetTable(ctx, db, schema, name) - if err != nil { - return nil, err - } - - rtSchema := &runtimev1.StructType{} - for name, typ := range meta.Schema { - t := databaseTypeToPB(typ) - rtSchema.Fields = append(rtSchema.Fields, &runtimev1.StructType_Field{ - Name: name, - Type: t, - }) - } - return &drivers.OlapTable{ - Database: db, - DatabaseSchema: schema, - Name: name, - View: meta.View, - Schema: rtSchema, - UnsupportedCols: nil, - PhysicalSizeBytes: 0, - }, nil -} - func rowsToSchema(r *sqlx.Rows) (*runtimev1.StructType, error) { fds, err := r.ColumnTypes() if err != nil { diff --git a/runtime/drivers/redshift/information_schema.go b/runtime/drivers/redshift/information_schema.go index 05f0992faf51..f2c4e29ce274 100644 --- a/runtime/drivers/redshift/information_schema.go +++ b/runtime/drivers/redshift/information_schema.go @@ -8,6 +8,7 @@ import ( "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/redshiftdata" "github.com/aws/aws-sdk-go-v2/service/redshiftdata/types" + runtimev1 "github.com/rilldata/rill/proto/gen/rill/runtime/v1" "github.com/rilldata/rill/runtime/drivers" "github.com/rilldata/rill/runtime/pkg/pagination" ) @@ -191,3 +192,44 @@ ORDER BY ordinal_position; func escapeStringValue(s string) string { return fmt.Sprintf("'%s'", strings.ReplaceAll(s, "'", "''")) } + +// All implements drivers.OLAPInformationSchema. +func (c *Connection) All(ctx context.Context, like string, pageSize uint32, pageToken string) ([]*drivers.OlapTable, string, error) { + return drivers.AllFromInformationSchema(ctx, like, pageSize, pageToken, c) +} + +// LoadPhysicalSize implements drivers.OLAPInformationSchema. +func (c *Connection) LoadPhysicalSize(ctx context.Context, tables []*drivers.OlapTable) error { + return nil +} + +// LoadDDL implements drivers.OLAPInformationSchema. +func (c *Connection) LoadDDL(ctx context.Context, table *drivers.OlapTable) error { + return nil // Not implemented +} + +// Lookup implements drivers.OLAPInformationSchema. +func (c *Connection) Lookup(ctx context.Context, db, schema, name string) (*drivers.OlapTable, error) { + meta, err := c.GetTable(ctx, db, schema, name) + if err != nil { + return nil, err + } + runtimeSchema := &runtimev1.StructType{ + Fields: make([]*runtimev1.StructType_Field, 0, len(meta.Schema)), + } + for name, typ := range meta.Schema { + runtimeSchema.Fields = append(runtimeSchema.Fields, &runtimev1.StructType_Field{ + Name: name, + Type: redshiftTypeToRuntimeType(typ), + }) + } + return &drivers.OlapTable{ + Database: db, + DatabaseSchema: schema, + Name: name, + View: meta.View, + Schema: runtimeSchema, + UnsupportedCols: nil, + PhysicalSizeBytes: 0, + }, nil +} diff --git a/runtime/drivers/redshift/olap.go b/runtime/drivers/redshift/olap.go index 2b2985147733..5ca7589d76f2 100644 --- a/runtime/drivers/redshift/olap.go +++ b/runtime/drivers/redshift/olap.go @@ -34,7 +34,7 @@ func (c *Connection) Exec(ctx context.Context, stmt *drivers.Statement) error { } // InformationSchema implements drivers.OLAPStore. -func (c *Connection) InformationSchema() drivers.OLAPInformationSchema { +func (c *Connection) InformationSchema() drivers.InformationSchema { return c } @@ -130,47 +130,6 @@ func (c *Connection) WithConnection(ctx context.Context, priority int, fn driver return drivers.ErrNotImplemented } -// All implements drivers.OLAPInformationSchema. -func (c *Connection) All(ctx context.Context, like string, pageSize uint32, pageToken string) ([]*drivers.OlapTable, string, error) { - return drivers.AllFromInformationSchema(ctx, like, pageSize, pageToken, c) -} - -// LoadPhysicalSize implements drivers.OLAPInformationSchema. -func (c *Connection) LoadPhysicalSize(ctx context.Context, tables []*drivers.OlapTable) error { - return nil -} - -// LoadDDL implements drivers.OLAPInformationSchema. -func (c *Connection) LoadDDL(ctx context.Context, table *drivers.OlapTable) error { - return nil // Not implemented -} - -// Lookup implements drivers.OLAPInformationSchema. -func (c *Connection) Lookup(ctx context.Context, db, schema, name string) (*drivers.OlapTable, error) { - meta, err := c.GetTable(ctx, db, schema, name) - if err != nil { - return nil, err - } - runtimeSchema := &runtimev1.StructType{ - Fields: make([]*runtimev1.StructType_Field, 0, len(meta.Schema)), - } - for name, typ := range meta.Schema { - runtimeSchema.Fields = append(runtimeSchema.Fields, &runtimev1.StructType_Field{ - Name: name, - Type: redshiftTypeToRuntimeType(typ), - }) - } - return &drivers.OlapTable{ - Database: db, - DatabaseSchema: schema, - Name: name, - View: meta.View, - Schema: runtimeSchema, - UnsupportedCols: nil, - PhysicalSizeBytes: 0, - }, nil -} - type rows struct { queryID string client *redshiftdata.Client diff --git a/runtime/drivers/snowflake/information_schema.go b/runtime/drivers/snowflake/information_schema.go index c6b67da28d39..721d0711513a 100644 --- a/runtime/drivers/snowflake/information_schema.go +++ b/runtime/drivers/snowflake/information_schema.go @@ -7,6 +7,7 @@ import ( "strconv" "strings" + runtimev1 "github.com/rilldata/rill/proto/gen/rill/runtime/v1" "github.com/rilldata/rill/runtime/drivers" "github.com/rilldata/rill/runtime/pkg/pagination" ) @@ -184,6 +185,70 @@ func (c *connection) GetTable(ctx context.Context, database, databaseSchema, tab return t, nil } +// All implements drivers.OLAPInformationSchema. +func (c *connection) All(ctx context.Context, like string, pageSize uint32, pageToken string) ([]*drivers.OlapTable, string, error) { + return drivers.AllFromInformationSchema(ctx, like, pageSize, pageToken, c) +} + +// LoadPhysicalSize implements drivers.OLAPInformationSchema. +func (c *connection) LoadPhysicalSize(ctx context.Context, tables []*drivers.OlapTable) error { + return nil +} + +// LoadDDL implements drivers.OLAPInformationSchema. +func (c *connection) LoadDDL(ctx context.Context, table *drivers.OlapTable) error { + db, err := c.getDB(ctx) + if err != nil { + return err + } + + // HACK: Since All and Lookup don't always return the correct casing, we uppercase the table name here as that's usually necessary in Snowflake. + // This is a workaround until we return correct casing from All and Lookup. + fqn := c.Dialect().EscapeTable(strings.ToUpper(table.Database), strings.ToUpper(table.DatabaseSchema), strings.ToUpper(table.Name)) + + objectType := "TABLE" + if table.View { + objectType = "VIEW" + } + + var ddl string + err = db.QueryRowContext(ctx, fmt.Sprintf("SELECT GET_DDL('%s', ?)", objectType), fqn).Scan(&ddl) + if err != nil { + return err + } + table.DDL = ddl + return nil +} + +// Lookup implements drivers.OLAPInformationSchema. +func (c *connection) Lookup(ctx context.Context, db, schema, name string) (*drivers.OlapTable, error) { + meta, err := c.GetTable(ctx, db, schema, name) + if err != nil { + return nil, err + } + + rtSchema := &runtimev1.StructType{} + for name, typ := range meta.Schema { + t, err := databaseTypeToPB(typ, 0, true) // add scale and nullability if needed + if err != nil { + return nil, err + } + rtSchema.Fields = append(rtSchema.Fields, &runtimev1.StructType_Field{ + Name: name, + Type: t, + }) + } + return &drivers.OlapTable{ + Database: db, + DatabaseSchema: schema, + Name: name, + View: meta.View, + Schema: rtSchema, + UnsupportedCols: nil, + PhysicalSizeBytes: 0, + }, nil +} + func getCurrentDatabaseAndSchema(ctx context.Context, db *sql.DB) (string, string, error) { query := "SELECT CURRENT_DATABASE(), CURRENT_SCHEMA()" diff --git a/runtime/drivers/snowflake/olap.go b/runtime/drivers/snowflake/olap.go index f706f51d3969..008d0b5d39d4 100644 --- a/runtime/drivers/snowflake/olap.go +++ b/runtime/drivers/snowflake/olap.go @@ -32,7 +32,7 @@ func (c *connection) Exec(ctx context.Context, stmt *drivers.Statement) error { } // InformationSchema implements drivers.OLAPStore. -func (c *connection) InformationSchema() drivers.OLAPInformationSchema { +func (c *connection) InformationSchema() drivers.InformationSchema { return c } @@ -133,70 +133,6 @@ func (c *connection) WithConnection(ctx context.Context, priority int, fn driver return drivers.ErrNotImplemented } -// All implements drivers.OLAPInformationSchema. -func (c *connection) All(ctx context.Context, like string, pageSize uint32, pageToken string) ([]*drivers.OlapTable, string, error) { - return drivers.AllFromInformationSchema(ctx, like, pageSize, pageToken, c) -} - -// LoadPhysicalSize implements drivers.OLAPInformationSchema. -func (c *connection) LoadPhysicalSize(ctx context.Context, tables []*drivers.OlapTable) error { - return nil -} - -// LoadDDL implements drivers.OLAPInformationSchema. -func (c *connection) LoadDDL(ctx context.Context, table *drivers.OlapTable) error { - db, err := c.getDB(ctx) - if err != nil { - return err - } - - // HACK: Since All and Lookup don't always return the correct casing, we uppercase the table name here as that's usually necessary in Snowflake. - // This is a workaround until we return correct casing from All and Lookup. - fqn := c.Dialect().EscapeTable(strings.ToUpper(table.Database), strings.ToUpper(table.DatabaseSchema), strings.ToUpper(table.Name)) - - objectType := "TABLE" - if table.View { - objectType = "VIEW" - } - - var ddl string - err = db.QueryRowContext(ctx, fmt.Sprintf("SELECT GET_DDL('%s', ?)", objectType), fqn).Scan(&ddl) - if err != nil { - return err - } - table.DDL = ddl - return nil -} - -// Lookup implements drivers.OLAPInformationSchema. -func (c *connection) Lookup(ctx context.Context, db, schema, name string) (*drivers.OlapTable, error) { - meta, err := c.GetTable(ctx, db, schema, name) - if err != nil { - return nil, err - } - - rtSchema := &runtimev1.StructType{} - for name, typ := range meta.Schema { - t, err := databaseTypeToPB(typ, 0, true) // add scale and nullability if needed - if err != nil { - return nil, err - } - rtSchema.Fields = append(rtSchema.Fields, &runtimev1.StructType_Field{ - Name: name, - Type: t, - }) - } - return &drivers.OlapTable{ - Database: db, - DatabaseSchema: schema, - Name: name, - View: meta.View, - Schema: rtSchema, - UnsupportedCols: nil, - PhysicalSizeBytes: 0, - }, nil -} - func rowsToSchema(r *sqlx.Rows) (*runtimev1.StructType, error) { if r == nil { return nil, nil diff --git a/runtime/drivers/starrocks/information_schema.go b/runtime/drivers/starrocks/information_schema.go index 993a0b6aff86..6e5496089f7b 100644 --- a/runtime/drivers/starrocks/information_schema.go +++ b/runtime/drivers/starrocks/information_schema.go @@ -9,20 +9,13 @@ import ( "github.com/rilldata/rill/runtime/drivers" ) -// informationSchema implements drivers.OLAPInformationSchema for StarRocks. -// Uses fully qualified names (catalog.information_schema.tables) instead of SET CATALOG/USE. -type informationSchema struct { - c *connection -} - -var _ drivers.OLAPInformationSchema = (*informationSchema)(nil) - +// StarRocks Uses fully qualified names (catalog.information_schema.tables) instead of SET CATALOG/USE // All returns metadata about all tables and views. // For StarRocks, we query from the configured catalog's information_schema. -func (i *informationSchema) All(ctx context.Context, like string, pageSize uint32, pageToken string) ([]*drivers.OlapTable, string, error) { - db := i.c.db +func (c *connection) All(ctx context.Context, like string, pageSize uint32, pageToken string) ([]*drivers.OlapTable, string, error) { + db := c.db - catalog := i.c.configProp.Catalog + catalog := c.configProp.Catalog // Build query using fully qualified information_schema path // Pattern: catalog.information_schema.tables @@ -95,21 +88,21 @@ func (i *informationSchema) All(ctx context.Context, like string, pageSize uint3 // Lookup returns metadata about a specific table or view. // database parameter = catalog, schema parameter = database in StarRocks terms. -func (i *informationSchema) Lookup(ctx context.Context, database, schema, name string) (*drivers.OlapTable, error) { - db := i.c.db +func (c *connection) Lookup(ctx context.Context, database, schema, name string) (*drivers.OlapTable, error) { + db := c.db // StarRocks mapping: database parameter = catalog // If database is empty, use connector's configured catalog catalog := database if catalog == "" { - catalog = i.c.configProp.Catalog + catalog = c.configProp.Catalog } // StarRocks mapping: schema parameter = database // If schema is empty, use connector's configured database dbSchema := schema if dbSchema == "" { - dbSchema = i.c.configProp.Database + dbSchema = c.configProp.Database } // Query table metadata using fully qualified information_schema path @@ -158,7 +151,7 @@ func (i *informationSchema) Lookup(ctx context.Context, database, schema, name s return nil, err } - runtimeType, err := i.c.databaseTypeToRuntimeType(dataType) + runtimeType, err := c.databaseTypeToRuntimeType(dataType) if err != nil { if errors.Is(err, errUnsupportedType) { unsupportedCols[colName] = dataType @@ -193,7 +186,7 @@ func (i *informationSchema) Lookup(ctx context.Context, database, schema, name s // LoadPhysicalSize populates the PhysicalSizeBytes field of table metadata. // For external catalogs, this may not be available. -func (i *informationSchema) LoadPhysicalSize(ctx context.Context, tables []*drivers.OlapTable) error { +func (c *connection) LoadPhysicalSize(ctx context.Context, tables []*drivers.OlapTable) error { // StarRocks doesn't easily expose physical size for external tables // For internal tables, we could query be_tablets but it's complex // Return without error, leaving PhysicalSizeBytes as 0 @@ -201,16 +194,16 @@ func (i *informationSchema) LoadPhysicalSize(ctx context.Context, tables []*driv } // LoadDDL implements drivers.OLAPInformationSchema. -func (i *informationSchema) LoadDDL(ctx context.Context, table *drivers.OlapTable) error { - db := i.c.db +func (c *connection) LoadDDL(ctx context.Context, table *drivers.OlapTable) error { + db := c.db catalog := table.Database if catalog == "" { - catalog = i.c.configProp.Catalog + catalog = c.configProp.Catalog } schema := table.DatabaseSchema if schema == "" { - schema = i.c.configProp.Database + schema = c.configProp.Database } q := fmt.Sprintf("SHOW CREATE TABLE %s.%s.%s", safeSQLName(catalog), safeSQLName(schema), safeSQLName(table.Name)) @@ -223,22 +216,13 @@ func (i *informationSchema) LoadDDL(ctx context.Context, table *drivers.OlapTabl return nil } -// InformationSchema interface implementation for drivers.InformationSchema - -var _ drivers.InformationSchema = (*informationSchemaImpl)(nil) - -// informationSchemaImpl implements drivers.InformationSchema for StarRocks -type informationSchemaImpl struct { - c *connection -} - // ListDatabaseSchemas returns a list of database schemas in StarRocks. // StarRocks structure: Catalog -> Database -> Table // We map: Database = catalog, DatabaseSchema = database -func (i *informationSchemaImpl) ListDatabaseSchemas(ctx context.Context, pageSize uint32, pageToken string) ([]*drivers.DatabaseSchemaInfo, string, error) { - db := i.c.db +func (c *connection) ListDatabaseSchemas(ctx context.Context, pageSize uint32, pageToken string) ([]*drivers.DatabaseSchemaInfo, string, error) { + db := c.db - catalog := i.c.configProp.Catalog + catalog := c.configProp.Catalog // Query information_schema.schemata using fully qualified path q := fmt.Sprintf(` @@ -296,8 +280,8 @@ func (i *informationSchemaImpl) ListDatabaseSchemas(ctx context.Context, pageSiz // ListTables returns a list of tables in a specific database schema. // database parameter = catalog, databaseSchema parameter = database -func (i *informationSchemaImpl) ListTables(ctx context.Context, database, databaseSchema string, pageSize uint32, pageToken string) ([]*drivers.TableInfo, string, error) { - db := i.c.db +func (c *connection) ListTables(ctx context.Context, database, databaseSchema string, pageSize uint32, pageToken string) ([]*drivers.TableInfo, string, error) { + db := c.db // StarRocks mapping: database parameter = catalog catalog := database @@ -366,8 +350,8 @@ func (i *informationSchemaImpl) ListTables(ctx context.Context, database, databa } // GetTable returns metadata about a specific table. -func (i *informationSchemaImpl) GetTable(ctx context.Context, database, databaseSchema, tableName string) (*drivers.TableMetadata, error) { - db := i.c.db +func (c *connection) GetTable(ctx context.Context, database, databaseSchema, tableName string) (*drivers.TableMetadata, error) { + db := c.db // StarRocks mapping: database parameter = catalog catalog := database diff --git a/runtime/drivers/starrocks/olap.go b/runtime/drivers/starrocks/olap.go index 88d94b93aa44..afc6ccd89c6f 100644 --- a/runtime/drivers/starrocks/olap.go +++ b/runtime/drivers/starrocks/olap.go @@ -143,8 +143,8 @@ func (c *connection) QuerySchema(ctx context.Context, query string, args []any) } // InformationSchema implements drivers.OLAPStore. -func (c *connection) InformationSchema() drivers.OLAPInformationSchema { - return &informationSchema{c: c} +func (c *connection) InformationSchema() drivers.InformationSchema { + return c } // EstimateSize implements drivers.OLAPStore. diff --git a/runtime/drivers/starrocks/starrocks.go b/runtime/drivers/starrocks/starrocks.go index a0632764808b..5524130338c5 100644 --- a/runtime/drivers/starrocks/starrocks.go +++ b/runtime/drivers/starrocks/starrocks.go @@ -291,7 +291,7 @@ func (c *connection) AsOLAP(instanceID string) (drivers.OLAPStore, bool) { // AsInformationSchema implements drivers.Handle. func (c *connection) AsInformationSchema() (drivers.InformationSchema, bool) { - return &informationSchemaImpl{c: c}, true + return c, true } // AsObjectStore implements drivers.Handle.