WEBVTT

00:00.000 --> 00:28.160
Okay, as you're getting settled in, we, next speaker, we have Nicoleta from Fresha,

00:28.160 --> 00:33.560
this name of the company, and then, should be talking about query federation and all

01:03.560 --> 01:31.520
all right, we'll get started, okay, no, hello, no.

01:31.520 --> 01:54.640
Hello, I am Nicoleta, I come from London, and I will talk today about query federation

01:54.640 --> 02:01.960
in modern, modern all of the databases, right, so I want to go through a journey together

02:01.960 --> 02:23.600
with you on what query federation is this, okay, so, yeah, okay, all right, so today I will

02:23.600 --> 02:31.320
talk together with you for a journey of what query federation is, why it is important to

02:31.320 --> 02:37.400
be able to have a unified sequence access across various data sources and why query federation

02:37.400 --> 02:43.560
is very compelling in query, the data lake, lakes, and in a way that allows us to build

02:43.560 --> 02:51.520
the unified lake of architecture, so let's start simple from what query federation is, and essentially

02:51.520 --> 02:57.520
it's just a way to query various data sources without needing to replicate data without needing

02:57.520 --> 03:03.320
to copy over anything or bring everything into a single place, it's a way to simplify data

03:03.320 --> 03:09.120
architectures, so without query federation and if you have multiple data sources that you need

03:09.120 --> 03:15.600
to do, what you'll have to do is to just query the first data source, for example,

03:15.600 --> 03:24.600
query the second data source in just, during them manually, either in the back and up or

03:24.600 --> 03:31.600
the data analyst in some sort of spreadsheet, but at the same time another possibility is to

03:31.600 --> 03:36.200
as I was mentioning is to just bring everything into a data warehouse, right, and I think

03:36.200 --> 03:41.400
this kind of diagram is quite familiar to a lot of people because it has this idea of bringing

03:41.400 --> 03:48.800
the operational realm from data sources into the analytical of state into some ingestion

03:48.800 --> 03:53.200
there and from their analytic engineers would take over and model the data.

03:53.200 --> 04:00.000
The problem with this is that the data stays within this internal format of the warehouse

04:00.000 --> 04:07.200
and if the warehouse doesn't support itself some use cases that you require you either need

04:07.200 --> 04:12.600
to get the data out and process it elsewhere and build even more data pipelines that

04:12.600 --> 04:14.000
need to be maintained and so on.

04:14.000 --> 04:25.000
Oh, sorry, can I will do, okay, is it better now?

04:25.000 --> 04:34.600
Sorry, okay, I will just keep it like this, so sorry about it, I can repeat just the introduction

04:34.600 --> 04:41.600
so essentially with query federation we want to be able to not copy the data over the in a

04:41.600 --> 04:50.600
single warehouse and we don't want to also to either query it in each data source individually.

04:50.600 --> 04:58.200
Now this engine, the federation query engine allows essentially to do this work for us, right?

04:58.200 --> 05:07.200
So it will just be able to split of federation query into sub queries, send those sub queries

05:07.200 --> 05:14.200
to the individual data source is created a data plan that an execution plan that allows

05:14.200 --> 05:20.200
this separation and then collate the results within it internal engine.

05:20.200 --> 05:23.200
The user in the end gets the final results.

05:23.200 --> 05:26.200
Now there are some challenges because nothing comes for free.

05:26.200 --> 05:31.200
So when you federate across various sources you have the problem of performance which

05:31.200 --> 05:34.200
depends on the slow source.

05:34.200 --> 05:41.200
Also, every data base comes with their own data access patterns, security policies and also

05:41.200 --> 05:48.200
network limitations when it comes to softening data when joining in between nodes.

05:48.200 --> 05:58.200
So for example things like LTP sources or open search or data leaks.

05:58.200 --> 06:04.200
How it actually looks in three of them from query execution perspective.

06:04.200 --> 06:10.200
So when a client would fire a query, it will reach a coordinator, that coordinator will

06:10.200 --> 06:18.200
analyze the data and will return then the planner will return in the best possible execution plan.

06:18.200 --> 06:21.200
And result will be scheduled on various workers.

06:21.200 --> 06:28.200
Each worker will be tasked to via the connector, connect to the source, get the results,

06:28.200 --> 06:33.200
join them, shuffle the data if needed, join the results and return the final.

06:33.200 --> 06:40.200
Now, I mentioned all of the databases and there's an example I gave start.

06:40.200 --> 06:45.200
But I'm not entirely sure how many of you has heard about start up in the past.

06:45.200 --> 06:47.200
Okay, two, three.

06:47.200 --> 06:49.200
Okay, so it is an all of the database.

06:49.200 --> 06:50.200
It is open source.

06:50.200 --> 06:53.200
It is essentially a clone of Doris.

06:54.200 --> 06:59.200
And it has MPP processing engine, vectorized.

06:59.200 --> 07:04.200
And what is compelling about start up is this my protocol compatibility.

07:04.200 --> 07:10.200
Right, because it's very easy to integrate in the existing ecosystems.

07:10.200 --> 07:18.200
It is also federated query engine and it shines in data lake house integrations.

07:18.200 --> 07:23.200
Now, in start of saturation is very similar in a sense with three.

07:23.200 --> 07:25.200
But the rock couple of differences.

07:25.200 --> 07:34.200
So clients also federated query, the front and cluster of nodes, which function as a coordinator,

07:34.200 --> 07:39.200
would get the query, would determine again, will parse it, will determine the best possible plan.

07:39.200 --> 07:45.200
And then we'll send the plan to compute nodes for execution.

07:45.200 --> 07:51.200
Those compute nodes will also get the data, but while returning the results,

07:51.200 --> 07:55.200
most of these results will be kept in an internal data cache layer.

07:55.200 --> 08:00.200
And then the results will be able to be used in the subsequent query.

08:00.200 --> 08:07.200
Now, in a nutshell, if we want to put together these two paradigms, the idea is very simple.

08:07.200 --> 08:12.200
There are two different strategies for doing query federation.

08:12.200 --> 08:19.200
And a trainer will optimize for as many data sources as possible.

08:19.200 --> 08:25.200
Well, and this is very useful for external exploratory federation at whole queries and so on.

08:25.200 --> 08:35.200
But if you need more like production, all up, and to put these workloads in front of users,

08:35.200 --> 08:40.200
then probably optimizations that the all-up engines do by default,

08:40.200 --> 08:43.200
would be very useful and necessary.

08:43.200 --> 08:46.200
Why is the duration important specifically?

08:46.200 --> 08:54.200
It's also because it is a compelling case for querying data lake houses in addition to other internal formats of the databases.

08:54.200 --> 09:00.200
So, for example, and this actually enables a compulsory separation,

09:00.200 --> 09:08.200
because a computer storage is just in cloud, in very cheap cloud storage,

09:08.200 --> 09:17.200
but computer provision to have the data process, the data on demand, and only when it's required.

09:17.200 --> 09:25.200
This led in the recent years to have this lake house architectures that are built on top of open table format.

09:26.200 --> 09:35.200
Now, why I will use the iceberg, as an example, but any open table format might be used.

09:35.200 --> 09:40.200
So, in this case, iceberg, it has a very wide support in the industry,

09:40.200 --> 09:46.200
mostly because it has an open specification and has been adopted by very big companies at the moment, at least.

09:46.200 --> 09:56.200
But, indeed, it's just a series of files and manifest files that go into some data files in the storage.

09:56.200 --> 10:02.200
And readers need to actually rely on statistics, understand those statistics in order to have to be able to query the query them.

10:02.200 --> 10:13.200
So, there is this separation between metadata plan and data plan, and in order for the query and engine to be able to read those data files,

10:13.200 --> 10:17.200
they need to navigate across all those metadata.

10:17.200 --> 10:26.200
Now, separating iceberg can be done by both of these in both strategies in both patterns, right?

10:26.200 --> 10:32.200
The difference would be that we will have to scan tables directly in objects storage,

10:32.200 --> 10:37.200
and that's why you will pay this price of opening the files at very time.

10:37.200 --> 10:45.200
But, startups on the other hand, because it's an all-up engine, it can bring the external data internally,

10:45.200 --> 10:52.200
and then cache results, cache metadata, optionally build materialized views on top of queries,

10:52.200 --> 10:57.200
and this essentially leads to minimizing remote reads.

10:57.200 --> 11:01.200
Now, the federation to iceberg from Starbucks is very similar.

11:01.200 --> 11:07.200
The only addition is that you need this iceberg catalog to be able to say,

11:07.200 --> 11:10.200
OK, I have a table name, where do I find the metadata?

11:10.200 --> 11:17.200
And once I have the metadata, I can get statistics, I can determine exactly what files I can plan,

11:17.200 --> 11:26.200
I can do partition training, and then I can just say, OK, I only need to scan very little files from the system.

11:26.200 --> 11:33.200
And then I need, I can return the results of the user in a very performant manner.

11:33.200 --> 11:39.200
Now, how it looks in practice, I gave here an example, just to see how simple it actually is, right?

11:39.200 --> 11:47.200
Because in this query, which is really a query on top of a view, there is this joining between an all-up table,

11:47.200 --> 11:53.200
which is internal to startups, and an iceberg table, which is staging the data link,

11:53.200 --> 11:57.200
and then there was a drink, but the iceberg scan itself is very quick.

11:57.200 --> 12:04.200
In this example, it's like 2 milliseconds, just because it relies heavily on the data,

12:04.200 --> 12:07.200
on metadata, cache and data cache.

12:07.200 --> 12:15.200
Now, and underneath all these view, there are more tables that lead in the end to the data sources.

12:15.200 --> 12:20.200
Now, how star, how star ox itself addresses the challenges that I was mentioning before,

12:20.200 --> 12:24.200
I already mentioned that this put everything in context.

12:24.200 --> 12:33.200
There is the object latency, storage latency will be solved by data and metadata cache, cacheing, data freshness,

12:33.200 --> 12:38.200
the raw strategies, like, hot called data separation, which I will explain in a moment.

12:38.200 --> 12:43.200
File fragmentation is solved by just cacheing, and materializing some of the view,

12:43.200 --> 12:49.200
such that you don't need to rely on compaction so much on table maintenance, essentially.

12:49.200 --> 12:55.200
And schema evolution is just by default, because star ox can, as incredibly refresh metadata,

12:55.200 --> 13:01.200
so once the table gets updated in iceberg, it will be updated in the example from Kafka,

13:01.200 --> 13:08.200
or maybe if we need to do stream processing, we will do the stream processing using cling,

13:08.200 --> 13:11.200
then ingested into star ox, for example.

13:11.200 --> 13:16.200
And federation stays in the middle there to say, okay, if a user does a query,

13:16.200 --> 13:23.200
that needs both very recent data from, did it happen like seconds ago,

13:23.200 --> 13:29.200
but also data that happened like that was ingested like a couple of years ago,

13:29.200 --> 13:31.200
that's very easy to do, right?

13:31.200 --> 13:37.200
You have query federation that can query directly the called storage.

13:37.200 --> 13:43.200
The internal tables are already there, but also we can bring into the engine,

13:43.200 --> 13:46.200
as well into the user.

13:46.200 --> 13:52.200
We can also bring other sources, for example, open search in this case to say, okay,

13:52.200 --> 13:58.200
we have a query that where we want to search to do full text search based on users, for example.

13:58.200 --> 14:07.200
And I query first, open search, then I enrich that with whatever was saved in the internal storage,

14:07.200 --> 14:12.200
and finally I append the historical results.

14:12.200 --> 14:17.200
Now, if we zoom in into all these 18-dimensional layer,

14:17.200 --> 14:23.200
we can think of this architecture that we essentially build with one single all-up data source,

14:23.200 --> 14:28.200
and you can see here that all these components are open source components, right?

14:28.200 --> 14:31.200
Everything can be built on top of open source components.

14:31.200 --> 14:36.200
These data, we can get an architecture that essentially it is layered,

14:36.200 --> 14:43.200
so we have an injection layer that goes from postgres and then data getting to Kafka,

14:43.200 --> 14:48.200
for example, from Kafka, it can be processed with a thing,

14:48.200 --> 14:54.200
and then ingested into various other sources,

14:54.200 --> 15:03.200
and then also from Kafka, it can reach the real-time layer backed by start-off internal catalog.

15:03.200 --> 15:09.200
Now, in here, the common component is essentially this all-up engine, right?

15:09.200 --> 15:15.200
And we can change the technologies, but the idea remains the same, right?

15:15.200 --> 15:20.200
If it's not start-ups, it can be, for example, Apache Doris, or VLODB, or something else,

15:20.200 --> 15:22.200
but the idea remains the same.

15:22.200 --> 15:29.200
There is a need for all-up data base that can federate across both cold data,

15:29.200 --> 15:35.200
and hot data that's internal, and maybe bring together within the engine,

15:35.200 --> 15:40.200
other external data sources, for example, open search, or something else,

15:40.200 --> 15:48.200
and this gives a unified and unified query capabilities across these sources such that

15:48.200 --> 15:57.200
you don't need a system anymore, that is essentially need separate cold data,

15:57.200 --> 16:02.200
for example, data versus hot data, they can both be served from the same engine.

16:02.200 --> 16:08.200
If we can go with a couple of key takeaways, the idea is that, yes,

16:08.200 --> 16:13.200
query Federation is very useful in orchestrating these queries across,

16:13.200 --> 16:19.200
and leaving a unified SQL access, the comes with trade-offs.

16:19.200 --> 16:22.200
So in terms of performance cost use cases,

16:22.200 --> 16:27.200
but the idea is that industry shifts towards cheap, usable, and universal storage,

16:27.200 --> 16:33.200
and the query engines as well need to adapt to be able to query these new lakehouse architectures.

16:33.200 --> 16:35.200
And thank you very much.

16:35.200 --> 16:38.200
I think I'm open-type.

