首页 >> 大全

Apache Calcite官方文档中文版- 进阶-3

2023-10-06 大全 24 作者:考证青年

第二部分 进阶() 3. 流()

扩展了SQL和关系代数以支持流式查询

3.1 简介

流是收集到持续不断流动的记录,永远不停止。与表不同,它们通常不存储在磁盘上,而流是通过网络,并在内存中保存很短的时间。

数据流是对表格的补充,因为它们代表了企业现在和将来发生的事情,而表格代表了过去。一个流被存档到一个表中是很常见的。

与表一样,您经常希望根据关系代数以高级语言查询流,根据模式()进行验证,并优化以充分利用可用的资源和算法。

的SQL是对标准SQL的扩展,而不是另一种“类SQL”的语言。区别很重要,原因如下:

如果不使用关键字,则返回常规标准SQL。

3.2 示例

流式SQL使用以下:

3.3 简单查询

最简单的流式查询:

SELECT STREAM *
FROM Orders;rowtime | productId | orderId | units
----------+-----------+---------+-------10:17:00 |        30 |       5 |     410:17:05 |        10 |       6 |     110:18:05 |        20 |       7 |     210:18:07 |        30 |       8 |    2011:02:00 |        10 |       9 |     611:04:00 |        10 |      10 |     111:09:30 |        40 |      11 |    1211:24:11 |        10 |      12 |     4

该查询读取流中的所有列和行。与任何流式查询一样,它永远不会终止。只要记录到达,它就会输出一条记录。

输入-C以终止查询。

关键字是SQL流的主要扩展。它告诉系统你对订单有兴趣,而不是现有订单。

查询:

SELECT *
FROM Orders;rowtime | productId | orderId | units
----------+-----------+---------+-------08:30:00 |        10 |       1 |     308:45:10 |        20 |       2 |     109:12:21 |        10 |       3 |    1009:27:44 |        30 |       4 |     24 records returned.

也是有效的,但会打印出现有的所有订单,然后终止。我们把它称为关系查询,而不是流式处理。它具有传统的SQL语义。

很特殊,因为它有一个流和一个表。如果您尝试在表上运行流式查询或在流上运行关系式查询,则会抛出一个错误:

SELECT * FROM Shipments;ERROR: Cannot convert stream 'SHIPMENTS' to a tableSELECT STREAM * FROM Products;ERROR: Cannot convert table 'PRODUCTS' to a stream

3.4 过滤行

与常规的SQL中一样,使用一个WHERE子句来过滤行:

SELECT STREAM *
FROM Orders
WHERE units > 3;rowtime | productId | orderId | units
----------+-----------+---------+-------10:17:00 |        30 |       5 |     410:18:07 |        30 |       8 |    2011:02:00 |        10 |       9 |     611:09:30 |        40 |      11 |    12
11:24:11 |        10 |      12 |     4

3.5 表达式投影

在子句中使用表达式来选择要返回或计算表达式的列:

SELECT STREAM rowtime,'An order for ' || units || ' '|| CASE units WHEN 1 THEN 'unit' ELSE 'units' END|| ' of product #' || productId AS description
FROM Orders;rowtime | description
----------+---------------------------------------10:17:00 | An order for 4 units of product #3010:17:05 | An order for 1 unit of product #1010:18:05 | An order for 2 units of product #2010:18:07 | An order for 20 units of product #3011:02:00 | An order by 6 units of product #1011:04:00 | An order by 1 unit of product #1011:09:30 | An order for 12 units of product #4011:24:11 | An order by 4 units of product #10

我们建议您始终在条款中包含列。在每个流和流式查询中有一个有序的时间戳,可以在稍后进行高级计算,例如GROUP BY和JOIN。

3.6 滚动窗口

有几种方法可以计算流上的聚合函数。差异是:

窗口类型:

SELECT STREAM CEIL(rowtime TO HOUR) AS rowtime,productId,COUNT(*) AS c,SUM(units) AS units
FROM Orders
GROUP BY CEIL(rowtime TO HOUR), productId;rowtime | productId |      c | units
------------+---------------+------------+-------11:00:00 |     30 |       2 |    2411:00:00 |     10 |       1 |     111:00:00 |     20 |       1 |     712:00:00 |     10 |       3 |    1112:00:00 |     40 |       1 |    12

结果是流。在11点整,发出自10点以来一直到11点有下订单的的小计。12点,它会发出11:00至12:00之间的订单。每个输入行只贡献到一个输出行。

是如何知道10:00:00的小计在11:00:00完成的,这样就可以发出它们了?它知道是在增加,而且它也知道CEIL( TO HOUR)在增加。所以,一旦在11:00:00时间点或之后看到一行,它将永远不会看到贡献到上午10:00:00的一行。

增加或减少的列以及表达式是单调的。(单调递增或单调递减)

如果列或表达式的值具有轻微的失序,并且流具有用于声明特定值将不会再被看到的机制(例如标点符号或水印),则该列或表达式被称为准单调。

在GROUP BY子句中没有单调或准单调表达式的情况下,无法取得进展,并且不允许查询:

SELECT STREAM productId,
COUNT(*) AS c,
SUM(units) AS units
FROM Orders
GROUP BY productId;ERROR: Streaming aggregation requires at least one monotonic expression

单调和准单调的列需要在模式中声明。当记录输入流并且由从该流中读取数据的假定查询时,单调性被强制执行。我们建议为每个流指定一个时间戳列,但也可以声明其他列是单调的,例如。

我们将在下面的内容讨论标点符号,水印,并取得进展的其他方法。

3.7 滚动窗口,改进

前面的滚动窗口的例子很容易写,因为窗口是一个小时。对于不是整个时间单位的时间间隔,例如2小时或2小时17分钟,则不能使用CEIL,表达式将变得更复杂。

支持滚动窗口的替代语法:

SELECT STREAM TUMBLE_END(rowtime, INTERVAL '1' HOUR) AS rowtime,productId,COUNT(*) AS c,SUM(units) AS units
FROM Orders
GROUP BY TUMBLE(rowtime, INTERVAL '1' HOUR), productId;rowtime | productId |       c | units
----------+-----------+---------+-------11:00:00 |        30 |       2 |    2411:00:00 |        10 |       1 |     111:00:00 |        20 |       1 |     712:00:00 |        10 |       3 |    1112:00:00 |        40 |       1 |    12

正如你所看到的,它返回与前一个查询相同的结果。函数返回一个分组键,这个分组键在给定的汇总行中将会以相同的方式结束;函数采用相同的参数并返回该窗口的结束时间; 当然还有一个函数。

有一个可选参数来对齐窗口。在以下示例中,我们使用30分钟间隔和0:12作为对齐时间,因此查询在每小时过去12分钟和42分钟时发出汇总:

SELECT STREAMTUMBLE_END(rowtime, INTERVAL '30' MINUTE, TIME '0:12') AS rowtime,productId,COUNT(*) AS c,SUM(units) AS units
FROM Orders
GROUP BY TUMBLE(rowtime, INTERVAL '30' MINUTE, TIME '0:12'),productId;rowtime | productId |       c | units
----------+-----------+---------+-------10:42:00 |        30 |       2 |    2410:42:00 |        10 |       1 |     110:42:00 |        20 |       1 |     711:12:00 |        10 |       2 |     711:12:00 |        40 |       1 |    1211:42:00 |        10 |       1 |     4

3.8 跳转窗口

跳转窗口是滚动窗口的泛化(概括),它允许数据在窗口中保持比发出间隔更长的时间。

查询发出的行的时间戳11:00,包含数据从08:00至11:00(或10:59.9);以及行的时间戳12:00,包含数据从09:00至12:00。

SELECT STREAMHOP_END(rowtime, INTERVAL '1' HOUR, INTERVAL '3' HOUR) AS rowtime,COUNT(*) AS c,SUM(units) AS units
FROM Orders
GROUP BY HOP(rowtime, INTERVAL '1' HOUR, INTERVAL '3' HOUR);rowtime |        c | units
----------+----------+-------11:00:00 |        4 |    2712:00:00 |        8 |    50

在这个查询中,因为保留期是发出期的3倍,所以每个输入行都贡献到3个输出行。想象一下,HOP函数为传入行生成一组Group Keys,并将其值存储在每个Group Key的累加器中。例如,HOP(10:18:00, '1' HOUR, '3')产生3个时间间隔周期:

[08:00, 09:00)

[09:00, 10:00)

[10:00, 11:00)

这就提出了允许不满意内置函数HOP和的用户来自定义的分区函数的可能性。

我们可以建立复杂的复杂表达式,如指数衰减的移动平均线:

SELECT STREAM HOP_END(rowtime),productId,SUM(unitPrice * EXP((rowtime - HOP_START(rowtime)) SECOND / INTERVAL '1' HOUR))/ SUM(EXP((rowtime - HOP_START(rowtime)) SECOND / INTERVAL '1' HOUR))

发出:

SELECT STREAM TUMBLE_END(rowtime, INTERVAL '1' HOUR) AS rowtime,productId
FROM Orders
GROUP BY TUMBLE(rowtime, INTERVAL '1' HOUR), productId
HAVING COUNT(*) > 2 OR SUM(units) > 10;rowtime | productId
----------+-----------
10:00:00 |        30
11:00:00 |        10

3.11 子查询,视图和SQL闭包属性

前述的查询可以使用WHERE子查询中的子句来表示:

SELECT STREAM rowtime, productId
FROM (SELECT TUMBLE_END(rowtime, INTERVAL '1' HOUR) AS rowtime,productId,COUNT(*) AS c,SUM(units) AS suFROM OrdersGROUP BY TUMBLE(rowtime, INTERVAL '1' HOUR), productId)
WHERE c > 2 OR su > 10;rowtime | productId
----------+-----------10:00:00 |        3011:00:00 |        1011:00:00 |        40

子句是在SQL早期引入的,当需要在聚合之后执行过滤器时,(回想一下,WHERE在输入到达GROUP BY子句之前过滤行)。

从那时起,SQL已经成为一种数学封闭的语言,这意味着您可以在一个表上执行的任何操作也可以在查询上执行。

SQL的闭包属性非常强大。它不仅使陈旧过时(或至少减少到语法糖),它使视图成为可能:

CREATE VIEW HourlyOrderTotals (rowtime, productId, c, su) ASSELECT TUMBLE_END(rowtime, INTERVAL '1' HOUR),productId,COUNT(*),SUM(units)FROM OrdersGROUP BY TUMBLE(rowtime, INTERVAL '1' HOUR), productId;SELECT STREAM rowtime, productId
FROM HourlyOrderTotals
WHERE c > 2 OR su > 10;rowtime | productId
----------+-----------10:00:00 |        3011:00:00 |        1011:00:00 |        40

FROM子句中的子查询有时被称为“内联视图”,但实际上它们比视图更基础。视图只是一个方便的方法,通过给出这些分片命名并将它们存储在元数据存储库中,将SQL分割成可管理的块。

很多人发现嵌套的查询和视图在流上比在关系上更有用。流式查询是连续运行的运算符的管道,而且这些管道通常会很长。嵌套的查询和视图有助于表达和管理这些管道。

顺便说一下,WITH子句可以完成与子查询或视图相同的操作:

WITH HourlyOrderTotals (rowtime, productId, c, su) AS (SELECT TUMBLE_END(rowtime, INTERVAL '1' HOUR),productId,COUNT(*),SUM(units)FROM OrdersGROUP BY TUMBLE(rowtime, INTERVAL '1' HOUR), productId)
SELECT STREAM rowtime, productId
FROM HourlyOrderTotals
WHERE c > 2 OR su > 10;rowtime | productId
------------+-----------10:00:00 |        3011:00:00 |        1011:00:00 |        40

3.12 流和关系之间的转换

回顾一下视图的定义。此视图是流还是关系?

它不包含关键字,所以它是一个关系。但是,这是一种可以转换成流的关系。

可以在关系和流式查询中使用它:

# A relation; will query the historic Orders table.
# Returns the largest number of product #10 ever sold in one hour.
SELECT max(su)
FROM HourlyOrderTotals
WHERE productId = 10;# A stream; will query the Orders stream.
# Returns every hour in which at least one product #10 was sold.
SELECT STREAM rowtime
FROM HourlyOrderTotals
WHERE productId = 10;

这种方法不限于视图和子查询。遵循CQL [1]中规定的方法,流式SQL中的每个查询都被定义为关系查询,并最上面的使用关键字转换为流。

如果关键字存在于子查询或视图定义中,则不起作用。

在查询准备时间,计算查询中引用的关系是否可以转换为流或历史的关系。

有时候,一个流可以提供它的一些历史记录(比如 Kafka [2]主题中最后24小时的数据),但不是全部。在运行时,计算出是否有足够的历史记录来运行查询,如果没有,则会给出错误。

进阶免费下载_进阶下载_

3.13“饼图”问题:流上的关系查询

一个特定的情况下,需要将流转换为关系时会发生我所说的“饼图问题”。想象一下,你需要写一个带有图表的网页,如下所示,它总结了每个产品在过去一小时内的订单数量。

Apache Calcite官方文档中文版- 进阶-3. 流(Streaming)

但是这个流只包含几条记录,而不是一个小时的汇总。我们需要对流的历史记录运行一个关系查询:

SELECT productId, count(*)
FROM Orders
WHERE rowtime BETWEEN current_timestamp - INTERVAL '1' HOURAND current_timestamp;

如果流的历史记录正在滚动到表中,尽管成本很高,我们可以回答查询。更好的办法是,如果我们可以告诉系统将一小时的汇总转化为表格,在流式处理过程中不断维护它,并自动重写查询以使用表格。

3.14 排序

ORDER BY的故事类似于GROUP BY。语法看起来像普通的SQL,但是必须确保它能够提供及时的结果。因此,它需要在ORDER BY键的前沿( edge)有一个单调的表达式。

SELECT STREAM CEIL(rowtime TO hour) AS rowtime, productId, orderId, units
FROM Orders
ORDER BY CEIL(rowtime TO hour) ASC, units DESC;rowtime | productId | orderId | units
----------+-----------+---------+-------10:00:00 |        30 |       8 |    2010:00:00 |        30 |       5 |     410:00:00 |        20 |       7 |     210:00:00 |        10 |       6 |     111:00:00 |        40 |      11 |    1211:00:00 |        10 |       9 |     611:00:00 |        10 |      12 |     411:00:00 |        10 |      10 |     1

大多数查询将按照插入的顺序返回结果,因为引使用流式算法,但不应该依赖它。例如,考虑一下:

SELECT STREAM *
FROM Orders
WHERE productId = 10
UNION ALL
SELECT STREAM *
FROM Orders
WHERE productId = 30;rowtime | productId | orderId | units
----------+-----------+---------+-------10:17:05 |        10 |       6 |     110:17:00 |        30 |       5 |     410:18:07 |        30 |       8 |    2011:02:00 |        10 |       9 |     611:04:00 |        10 |      10 |     111:24:11 |        10 |      12 |     4

= 30的行显然是不符合order要求的,可能是因为流以分区,分区后的流在不同的时间发送了他们的数据。

如果您需要特定的顺序,请添加一个显式的ORDER BY:

可能会通过合并使用实现UNION ALL,这样只是效率稍微低些。

只需要添加一个ORDER BY到最外层的查询。如果需要在UNION ALL之后执行GROUP BY,将会隐式添加ORDER BY,以便使GROUP BY算法成为可能。

3.15 表格构造器

子句创建一个拥有给定行集合的内联表。

流式传输是不允许的。这组行不会发生改变,因此一个流永远不会返回任何行。

> SELECT STREAM * FROM (VALUES (1, 'abc'));ERROR: Cannot stream VALUES

3.16 滑动窗口

标准SQL的功能特性之一可以在子句中使用所谓的“分析函数”。不像GROUP BY,不会折叠记录。对于每个进来的记录,出来一个记录。但是聚合函数是基于一个多行的窗口。

我们来看一个例子。

SELECT STREAM rowtime,productId,units,SUM(units) OVER (ORDER BY rowtime RANGE INTERVAL '1' HOUR PRECEDING) unitsLastHour

这个功能特性付出很小的努力就包含了很多Power。在子句中可以有多个函数,基于多个窗口规则定义。

以下示例返回在过去10分钟内平均订单数量大于上周平均订单数量的订单。

SELECT STREAM *
FROM (SELECT STREAM rowtime,productId,units,AVG(units) OVER product (RANGE INTERVAL '10' MINUTE PRECEDING) AS m10,AVG(units) OVER product (RANGE INTERVAL '7' DAY PRECEDING) AS d7FROM OrdersWINDOW product AS (ORDER BY rowtimePARTITION BY productId))

为了简洁起见,在这里我们使用一种语法,其中使用子句部分定义窗口,然后在每个OVER子句中细化窗口。也可以定义子句中的所有窗口,或者如果您愿意,可以定义所有内联窗口。

但真正的power超越语法。在幕后,这个查询维护着两个表,并且使用FIFO队列添加和删除子汇总中的值。但是,无需在查询中引入联接,也可以访问这些表。

窗口化聚合语法的一些其他功能特性:

我记得一些计划器的元数据(成本指标):

这个流按给定的一个或多个属性排序吗?是否可以对给定属性的流进行排序?(对于有限的关系,答案总是“是”;对于流,它依赖于的存在,或属性和排序键之间的联系)。我们需要引入什么延迟才能执行此类操作?执行此类操作的成本(CPU,内存等)是多少?

在.中,我们已经有了(1)。对于(2),答案对于有限关系总是“true”。但是我们需要为流实现(2),(3)和(4)。 3.22 流的状态

并非本文中的所有概念都已经在中实现。其他的可能在中实现,但不能在 [3] [4]等特定的适配器中实现。

已实现

未实现

本文档中提供的以下功能特性,以为支持它们,但实际上它还没有实现。全面支持意味着参考实现支持该功能特性(包括负面情况),TCK则对其进行测试。

本文档做了什么

3.23 函数

以下函数在标准SQL中不存在,但在流式SQL中定义。

标量函数:

分区函数:

关于我们

最火推荐

小编推荐

联系我们


版权声明:本站内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 88@qq.com 举报,一经查实,本站将立刻删除。备案号:桂ICP备2021009421号
Powered By Z-BlogPHP.
复制成功
微信号:
我知道了