Cloudera流分析中引入FlinkSQL+ 查看更多
Cloudera流分析中引入FlinkSQL
+ 查看更多
SELECT
userId,
COUNT(*) AS count,
SESSION_START(clicktime, INTERVAL '30' MINUTE)
FROM clicks
GROUP BY
SESSION(clicktime, INTERVAL '30' MINUTE)
userId
1) 在流媒体领域中可以用SQL制定多少业务逻辑?
2) 这如何改变从开发到生产的流式作业旅程?
3) 这如何影响数据工程团队的范围?
transactionId BIGINT,
`timestamp` BIGINT,
itemId STRING,
quantity INT,
event_time AS CAST(from_unixtime(floor(`timestamp`/1000)) AS TIMESTAMP(3)),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
'connector.type' = 'kafka',
'connector.version' = 'universal',
'connector.topic' = 'transaction.log.1',
'connector.startup-mode' = 'earliest-offset',
'connector.properties.bootstrap.servers' = '',
'format.type' = 'json'
);
SELECT * FROM ItemTransactions LIMIT 10;
SELECT TUMBLE_START(event_time, INTERVAL '10' SECOND) as window_start, itemId, sum(quantity) as volume
FROM ItemTransactions
GROUP BY itemId, TUMBLE(event_time, INTERVAL '10' SECOND);
SELECT * ,
ROW_NUMBER() OVER (
PARTITION BY window_start
ORDER BY num_transactions desc
) AS rownum
FROM (
SELECT TUMBLE_START(event_time, INTERVAL '10' MINUTE) AS window_start, itemId, COUNT(*) AS num_transactions
FROM ItemTransactions
GROUP BY itemId, TUMBLE(event_time, INTERVAL '10' MINUTE)
)
)
WHERE rownum <=3;

分享到: