Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions runtime/drivers/athena/information_schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/aws/aws-sdk-go-v2/service/athena"
"github.com/aws/aws-sdk-go-v2/service/athena/types"
"github.com/aws/smithy-go"
runtimev1 "github.com/rilldata/rill/proto/gen/rill/runtime/v1"
"github.com/rilldata/rill/runtime/drivers"
"github.com/rilldata/rill/runtime/pkg/pagination"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -180,6 +181,47 @@ func (c *Connection) listCatalogs(ctx context.Context, client *athena.Client) ([
return catalogs, nil
}

// All implements drivers.OLAPInformationSchema.
func (c *Connection) All(ctx context.Context, like string, pageSize uint32, pageToken string) ([]*drivers.OlapTable, string, error) {
return drivers.AllFromInformationSchema(ctx, like, pageSize, pageToken, c)
}

// LoadPhysicalSize implements drivers.OLAPInformationSchema.
func (c *Connection) LoadPhysicalSize(ctx context.Context, tables []*drivers.OlapTable) error {
return nil
}

// LoadDDL implements drivers.OLAPInformationSchema.
func (c *Connection) LoadDDL(ctx context.Context, table *drivers.OlapTable) error {
return nil // Not implemented
}

// Lookup implements drivers.OLAPInformationSchema.
func (c *Connection) Lookup(ctx context.Context, db, schema, name string) (*drivers.OlapTable, error) {
meta, err := c.GetTable(ctx, db, schema, name)
if err != nil {
return nil, err
}
runtimeSchema := &runtimev1.StructType{
Fields: make([]*runtimev1.StructType_Field, 0, len(meta.Schema)),
}
for name, typ := range meta.Schema {
runtimeSchema.Fields = append(runtimeSchema.Fields, &runtimev1.StructType_Field{
Name: name,
Type: athenaTypeToRuntimeType(typ),
})
}
return &drivers.OlapTable{
Database: db,
DatabaseSchema: schema,
Name: name,
View: meta.View,
Schema: runtimeSchema,
UnsupportedCols: nil,
PhysicalSizeBytes: 0,
}, nil
}

func (c *Connection) listSchemasForCatalog(ctx context.Context, client *athena.Client, catalog string) ([]*drivers.DatabaseSchemaInfo, error) {
// Use catalog if specified
var q string
Expand Down
43 changes: 1 addition & 42 deletions runtime/drivers/athena/olap.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func (c *Connection) Exec(ctx context.Context, stmt *drivers.Statement) error {
}

// InformationSchema implements drivers.OLAPStore.
func (c *Connection) InformationSchema() drivers.OLAPInformationSchema {
func (c *Connection) InformationSchema() drivers.InformationSchema {
return c
}

Expand Down Expand Up @@ -108,47 +108,6 @@ func (c *Connection) WithConnection(ctx context.Context, priority int, fn driver
return drivers.ErrNotImplemented
}

// All implements drivers.OLAPInformationSchema.
func (c *Connection) All(ctx context.Context, like string, pageSize uint32, pageToken string) ([]*drivers.OlapTable, string, error) {
return drivers.AllFromInformationSchema(ctx, like, pageSize, pageToken, c)
}

// LoadPhysicalSize implements drivers.OLAPInformationSchema.
func (c *Connection) LoadPhysicalSize(ctx context.Context, tables []*drivers.OlapTable) error {
return nil
}

// LoadDDL implements drivers.OLAPInformationSchema.
func (c *Connection) LoadDDL(ctx context.Context, table *drivers.OlapTable) error {
return nil // Not implemented
}

// Lookup implements drivers.OLAPInformationSchema.
func (c *Connection) Lookup(ctx context.Context, db, schema, name string) (*drivers.OlapTable, error) {
meta, err := c.GetTable(ctx, db, schema, name)
if err != nil {
return nil, err
}
runtimeSchema := &runtimev1.StructType{
Fields: make([]*runtimev1.StructType_Field, 0, len(meta.Schema)),
}
for name, typ := range meta.Schema {
runtimeSchema.Fields = append(runtimeSchema.Fields, &runtimev1.StructType_Field{
Name: name,
Type: athenaTypeToRuntimeType(typ),
})
}
return &drivers.OlapTable{
Database: db,
DatabaseSchema: schema,
Name: name,
View: meta.View,
Schema: runtimeSchema,
UnsupportedCols: nil,
PhysicalSizeBytes: 0,
}, nil
}

type rows struct {
queryID string
client *athena.Client
Expand Down
73 changes: 73 additions & 0 deletions runtime/drivers/bigquery/information_schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,3 +173,76 @@ func (c *Connection) GetTable(ctx context.Context, database, databaseSchema, tab

return r, nil
}

// All implements drivers.OLAPInformationSchema.
func (c *Connection) All(ctx context.Context, like string, pageSize uint32, pageToken string) ([]*drivers.OlapTable, string, error) {
return drivers.AllFromInformationSchema(ctx, like, pageSize, pageToken, c)
}

// LoadPhysicalSize implements drivers.OLAPInformationSchema.
func (c *Connection) LoadPhysicalSize(ctx context.Context, tables []*drivers.OlapTable) error {
return nil
}

// LoadDDL implements drivers.OLAPInformationSchema.
func (c *Connection) LoadDDL(ctx context.Context, table *drivers.OlapTable) error {
client, err := c.getClient(ctx)
if err != nil {
return err
}

q := fmt.Sprintf("SELECT ddl FROM `%s.%s.INFORMATION_SCHEMA.TABLES` WHERE table_name = @name", table.Database, table.DatabaseSchema)
cq := client.Query(q)
cq.Parameters = []bigquery.QueryParameter{
{Name: "name", Value: table.Name},
}

it, err := cq.Read(ctx)
if err != nil {
return err
}

var row struct {
DDL string `bigquery:"ddl"`
}
err = it.Next(&row)
if err != nil {
return err
}
table.DDL = row.DDL
return nil
}

// Lookup implements drivers.OLAPInformationSchema.
func (c *Connection) Lookup(ctx context.Context, db, schema, name string) (*drivers.OlapTable, error) {
client, err := c.getClient(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get BigQuery client: %w", err)
}

var table *bigquery.Table
if db != "" {
table = client.DatasetInProject(db, schema).Table(name)
} else {
table = client.Dataset(schema).Table(name)
}

meta, err := table.Metadata(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get table metadata: %w", err)
}
runtimeSchema, err := fromBQSchema(meta.Schema)
if err != nil {
return nil, err
}
tbl := &drivers.OlapTable{
Database: db,
DatabaseSchema: schema,
Name: name,
View: meta.Type == bigquery.ViewTable,
Schema: runtimeSchema,
UnsupportedCols: nil, // all columns are currently being mapped though may not be as specific as in BigQuery
PhysicalSizeBytes: 0,
}
return tbl, nil
}
75 changes: 1 addition & 74 deletions runtime/drivers/bigquery/olap.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (c *Connection) Exec(ctx context.Context, stmt *drivers.Statement) error {
}

// InformationSchema implements drivers.OLAPStore.
func (c *Connection) InformationSchema() drivers.OLAPInformationSchema {
func (c *Connection) InformationSchema() drivers.InformationSchema {
return c
}

Expand Down Expand Up @@ -146,79 +146,6 @@ func (c *Connection) WithConnection(ctx context.Context, priority int, fn driver
return drivers.ErrNotImplemented
}

// All implements drivers.OLAPInformationSchema.
func (c *Connection) All(ctx context.Context, like string, pageSize uint32, pageToken string) ([]*drivers.OlapTable, string, error) {
return drivers.AllFromInformationSchema(ctx, like, pageSize, pageToken, c)
}

// LoadPhysicalSize implements drivers.OLAPInformationSchema.
func (c *Connection) LoadPhysicalSize(ctx context.Context, tables []*drivers.OlapTable) error {
return nil
}

// LoadDDL implements drivers.OLAPInformationSchema.
func (c *Connection) LoadDDL(ctx context.Context, table *drivers.OlapTable) error {
client, err := c.getClient(ctx)
if err != nil {
return err
}

q := fmt.Sprintf("SELECT ddl FROM `%s.%s.INFORMATION_SCHEMA.TABLES` WHERE table_name = @name", table.Database, table.DatabaseSchema)
cq := client.Query(q)
cq.Parameters = []bigquery.QueryParameter{
{Name: "name", Value: table.Name},
}

it, err := cq.Read(ctx)
if err != nil {
return err
}

var row struct {
DDL string `bigquery:"ddl"`
}
err = it.Next(&row)
if err != nil {
return err
}
table.DDL = row.DDL
return nil
}

// Lookup implements drivers.OLAPInformationSchema.
func (c *Connection) Lookup(ctx context.Context, db, schema, name string) (*drivers.OlapTable, error) {
client, err := c.getClient(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get BigQuery client: %w", err)
}

var table *bigquery.Table
if db != "" {
table = client.DatasetInProject(db, schema).Table(name)
} else {
table = client.Dataset(schema).Table(name)
}

meta, err := table.Metadata(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get table metadata: %w", err)
}
runtimeSchema, err := fromBQSchema(meta.Schema)
if err != nil {
return nil, err
}
tbl := &drivers.OlapTable{
Database: db,
DatabaseSchema: schema,
Name: name,
View: meta.Type == bigquery.ViewTable,
Schema: runtimeSchema,
UnsupportedCols: nil, // all columns are currently being mapped though may not be as specific as in BigQuery
PhysicalSizeBytes: 0,
}
return tbl, nil
}

func (c *Connection) Head(ctx context.Context, db, schema, table string, limit int64) (*drivers.Result, error) {
client, err := c.getClient(ctx)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion runtime/drivers/clickhouse/olap.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ func (c *Connection) QuerySchema(ctx context.Context, query string, args []any)
return schema, nil
}

func (c *Connection) InformationSchema() drivers.OLAPInformationSchema {
func (c *Connection) InformationSchema() drivers.InformationSchema {
return c
}

Expand Down
85 changes: 85 additions & 0 deletions runtime/drivers/databricks/information_schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"

runtimev1 "github.com/rilldata/rill/proto/gen/rill/runtime/v1"
"github.com/rilldata/rill/runtime/drivers"
"github.com/rilldata/rill/runtime/pkg/pagination"
)
Expand Down Expand Up @@ -178,6 +179,90 @@ func (c *connection) GetTable(ctx context.Context, database, databaseSchema, tab
return t, nil
}

// All implements drivers.OLAPInformationSchema.
func (c *connection) All(ctx context.Context, like string, pageSize uint32, pageToken string) ([]*drivers.OlapTable, string, error) {
return drivers.AllFromInformationSchema(ctx, like, pageSize, pageToken, c)
}

// LoadPhysicalSize implements drivers.OLAPInformationSchema.
func (c *connection) LoadPhysicalSize(ctx context.Context, tables []*drivers.OlapTable) error {
return nil
}

// LoadDDL implements drivers.OLAPInformationSchema.
func (c *connection) LoadDDL(ctx context.Context, table *drivers.OlapTable) error {
db, err := c.getDB(ctx)
if err != nil {
return err
}

fqn := DialectDatabricks.EscapeTable(table.Database, table.DatabaseSchema, table.Name)

objectType := "TABLE"
if table.View {
objectType = "VIEW"
}

var ddl string
err = db.QueryRowContext(ctx, fmt.Sprintf("SHOW CREATE %s %s", objectType, fqn)).Scan(&ddl)
if err != nil {
return err
}
table.DDL = ddl
return nil
}

// Lookup implements drivers.OLAPInformationSchema.
func (c *connection) Lookup(ctx context.Context, database, schema, name string) (*drivers.OlapTable, error) {
prefix := catalogPrefix(database)
q := fmt.Sprintf(`
SELECT
CASE WHEN t.table_type = 'VIEW' THEN true ELSE false END AS is_view,
c.column_name,
c.data_type
FROM %sinformation_schema.tables t
JOIN %sinformation_schema.columns c
ON t.table_schema = c.table_schema AND t.table_name = c.table_name
WHERE t.table_schema = ? AND t.table_name = ?
ORDER BY c.ordinal_position
`, prefix, prefix)

conn, err := c.getDB(ctx)
if err != nil {
return nil, err
}

rows, err := conn.QueryContext(ctx, q, schema, name)
if err != nil {
return nil, err
}
defer rows.Close()

var isView bool
var fields []*runtimev1.StructType_Field
var colName, colType string
for rows.Next() {
if err := rows.Scan(&isView, &colName, &colType); err != nil {
return nil, err
}
fields = append(fields, &runtimev1.StructType_Field{
Name: colName,
Type: databaseTypeToPB(colType),
})
}
if err := rows.Err(); err != nil {
return nil, err
}

return &drivers.OlapTable{
Database: database,
DatabaseSchema: schema,
Name: name,
View: isView,
Schema: &runtimev1.StructType{Fields: fields},
}, nil
}

// catalogPrefix returns "<catalog>." if catalog is non-empty, or "" otherwise.
func catalogPrefix(catalog string) string {
if catalog == "" {
Expand Down
Loading
Loading