LakeFS
объединяет лучшие практики разработки программного обеспечения и применяет их к работе с данными.
LakeFS
обеспечивает контроль версий над озером данных и использует семантику, подобную git
, для фиксации версий данных и доступа к ним. Если вы знаете git
, с LakeFS
вы будете чувствовать себя привычно.
С помощью LakeFS
вы можете использовать в своем озере данных такие концепции, как:
branch
– для создания изолированной версии данных;commit
– для создания воспроизводимого снимка данных;merge
– чтобы объединять ваши изменения в одно атомарное состояние.У вас есть набор данных, и вы хотите увидеть, как он меняется с течением времени? Один из вариантов — сохранять полную копию всех данных при каждом изменении одного файла. Лучше всего это работает для небольших наборов данных.
Более экономичный и инкрементный подход к управлению версиями заключается в добавлении и поддержании двух полей метаданных в табличном наборе данных, часто называемых valid_from
и valid_to
.
При обновлении записи в этом наборе данных мы никогда не перезаписываем существующую запись. Вместо этого мы добавляем новые записи и обновляем поле valid_to
до текущей отметки времени для любой записи, которая была бы перезаписана.
Помимо того, что вы можете реализовать это в своих собственных ETL пайплайнах, это также, в частности, подход, который использует SQL Server для временных таблиц и то, что использует dbt для создания снимков.
Если первые два подхода можно резюмировать так: «Давайте добавим немного контроля версий к уже имеющимся у меня данным», то теперь пришло время совершенно иного подхода.
А что если относиться к управлению версиями данных как к объекту первого класса? Как к базовому и неотъемлемому свойству любых данных?
Чтобы сделать это возможным – как видно из ограничений вышеупомянутых подходов – необходимо, чтобы инструмент:
LakeFS решает эти задачи становясь некоторым уровнем абстракции над физическим объектным хранилищем данных.
Как только, вы настраиваете LakeFS над любым объектным хранилищем (либо же над файловой системой для тестов), вы больше не используете физические пути до объектов, LakeFS становится вашей файловой системой (FS). При этом LakeFS ничего не хранит данные на своей стороне, а отдаёт это внешнему хранилищу, таким образом выполняя функцию только интерфейса.
Graveler File
)
Как сказано в блоге LakeFS:
These are lakeFS internals and you do not need to know any of the details below in order to use lakeFS at any level.
На из-за этого мне ещё больше захотелось об этом рассказать. :)
Для инженера данных иногда может показаться, что LakeFS изо всех сил старается спрятать ваши данные. Действительно, один очень распространенный вопрос канала LakeFS #help в Slack — это вежливая вариация на тему «где мои данные?!».
LakeFS и правда выполняет роль абстракции поверх хранилища данных и делает это настолько хорошо, что найти сырые данные иногда становится сложно..
main/allstar_games_stat.csv
data
?..Есть несколько вариантов установки:
lakefs
и один для их консольной утилиты lakectl
.docker
, ибо есть официальный образ – treeverse/lakefs
, через который сразу можно использовать и lakefs
и lakectl
.Если мы говорим про production сервис, выделенный под lakefs
, то лучше использовать вариант с архивом, чтоб не добавлять абстракцию там, где она не нужна.
Для всего остального лучше использовать docker
.
Таким образом LakeFS запустится локально, без использования внешнего объектного хранилища, и без БД, в которой должны храниться наши метаданные. Это отличный старт, чтобы просто познакомиться с инструментом, но не рекомендую так в production делать. :)
docker-compose
version: "3.5"
name: lakefs-example
services:
lakefs:
image: treeverse/lakefs:1
pull_policy: always
ports:
- "8000:8000"
depends_on:
postgres:
condition: service_healthy
minio-setup:
condition: service_completed_successfully
environment:
- LAKEFS_BLOCKSTORE_TYPE=s3
- LAKEFS_BLOCKSTORE_S3_FORCE_PATH_STYLE=true
- LAKEFS_BLOCKSTORE_S3_ENDPOINT=http://minio:9000
- LAKEFS_BLOCKSTORE_S3_CREDENTIALS_ACCESS_KEY_ID=minioadmin
- LAKEFS_BLOCKSTORE_S3_CREDENTIALS_SECRET_ACCESS_KEY=minioadmin
- LAKEFS_AUTH_ENCRYPT_SECRET_KEY=some random secret string
- LAKEFS_LOGGING_LEVEL=INFO
- LAKEFS_STATS_ENABLED=${LAKEFS_STATS_ENABLED:-1}
- LAKECTL_CREDENTIALS_ACCESS_KEY_ID=AKIAIOSFOLKFSSAMPLES
- LAKECTL_CREDENTIALS_SECRET_ACCESS_KEY=wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY
- LAKECTL_SERVER_ENDPOINT_URL=http://localhost:8000
- LAKEFS_DATABASE_TYPE=postgres
- LAKEFS_DATABASE_POSTGRES_CONNECTION_STRING=postgres://postgres:postgres@postgres/postgres?sslmode=disable
entrypoint: ["/bin/sh", "-c"]
command:
- |
lakefs setup --user-name everything-bagel --access-key-id "$$LAKECTL_CREDENTIALS_ACCESS_KEY_ID" --secret-access-key "$$LAKECTL_CREDENTIALS_SECRET_ACCESS_KEY" || true
lakefs run &
echo "---- Creating repository ----"
wait-for -t 60 lakefs:8000 -- curl -u "$$LAKECTL_CREDENTIALS_ACCESS_KEY_ID":"$$LAKECTL_CREDENTIALS_SECRET_ACCESS_KEY" -X POST -H "Content-Type: application/json" -d '{ "name": "quickstart", "storage_namespace": "s3://quickstart", "default_branch": "main", "sample_data": true }' http://localhost:8000/api/v1/repositories || true
# wait-for -t 60 lakefs:8000 -- lakectl repo create lakefs://example s3://example || true
echo ""
wait-for -t 60 minio:9000 && echo '------------------------------------------------
MinIO admin: http://127.0.0.1:9001/
Username : minioadmin
Password : minioadmin
'
echo "------------------------------------------------"
echo ""
echo "lakeFS Web UI: http://127.0.0.1:8000/ >(._.)<"
echo " ( )_ "
echo ""
echo " Access Key ID : $$LAKECTL_CREDENTIALS_ACCESS_KEY_ID"
echo " Secret Access Key: $$LAKECTL_CREDENTIALS_SECRET_ACCESS_KEY"
echo ""
echo "-------- Let's go and have axolotl fun! --------"
echo ""
wait
minio-setup:
image: minio/mc:RELEASE.2023-05-18T16-59-00Z
environment:
- MC_HOST_lakefs=http://minioadmin:minioadmin@minio:9000
depends_on:
- minio
volumes:
- ./data:/data
entrypoint: ["/bin/sh", "-c"]
command:
- |
mc mb lakefs/quickstart lakefs/example lakefs/sample-data
exit 0
minio:
image: minio/minio:RELEASE.2023-05-18T00-05-36Z
ports:
- "9000:9000"
- "9001:9001"
entrypoint: ["minio", "server", "/data", "--console-address", ":9001"]
postgres:
image: postgres:14
environment:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
healthcheck:
test: ["CMD", "pg_isready", "-U", "postgres"]
interval: 10s
retries: 5
start_period: 5s
restart: always
networks:
default:
name: bagel
lakectl
Для полного списка команд можно посмотреть документацию, а здесь посмотрим на то, чем вы будете пользоваться 99% времени.
Python
Инструмент продолжает активно развиваться, поэтому можно столкнуться с тем, что есть старые библиотеки на python, которые выполнили интеграцию с lakefs (в том числе от самих создателей). Однако, работа с lakefs мало чем отличается от работы с любым объектным хранилищем, так что всегда можно использовать любый понравившийся вам S3 клиент.
Например, самый стандартный вариант это библиотека boto3
.
import boto3
s3 = boto3.client('s3',
endpoint_url='https://lakefs.example.com',
aws_access_key_id='AKIAIOSFODNN7EXAMPLE',
aws_secret_access_key='wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY')
with open('/local/path/to/file_0', 'rb') as f:
s3.put_object(Body=f, Bucket='example-repo', Key='main/example-file.parquet')
list_resp = s3.list_objects_v2(Bucket='example-repo', Prefix='c7a632d74f/example-prefix')
for obj in list_resp['Contents']:
print(obj['Key'])
s3.head_object(Bucket='example-repo', Key='main/example-file.parquet')
Однако, помимо этого, авторами LakeFS сейчас поддерживаются 2 библиотеки для Python, реализующих слегка разный уровень абстракции:
lakefs
– разработка авторов.lakefs-spec
– разработка сообщества, поддерживаемая авторами.Рассмотрим их чуток подробнее.
lakefs
В этой библиотеке реализованы все необходимые операции, которые чуток проще использовать, чем писать обращения через boto3
, в остальном, полностью похожий подход.
from lakefs.client import Client
clt = Client(
host="http://localhost:8000",
username="AKIAIOSFODNN7EXAMPLE",
password="wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
)
repo = lakefs.Repository("example-repo", client=clt).create(storage_namespace="s3://storage-bucket/repos/example-repo")
branch1 = lakefs.repository("example-repo").branch("experiment1").create(source_reference_id="main")
obj = branch1.object(path="text/sample_data.txt").upload(content_type="text/plain", data="This is my object data")
print(obj.stats())
print(obj.reader(mode='r').read())
obj = branch1.object(path="raw/file1.data").upload(data=b"Hello Object World", pre_sign=True)
print(obj.stats())
for diff in branch1.uncommitted():
print(diff)
ref = branch1.commit(message='Add some data!', metadata={'using': 'python_sdk'})
print(ref.get_commit())
main = repo.branch("main")
for diff in main.diff(other_ref=branch1):
print(diff)
res = branch1.merge_into(main)
print(res)
При необходимости, можно изучить документацию.
lakefs-spec
Тут решили реализовать слегка иной подход, использующий filesystem_spec
для Python. Таким образом, скрывая полностью работу с удаленным объектным хранилищем за простой работой с файловой системой.
from pathlib import Path
from lakefs_spec import LakeFSFileSystem
REPO, BRANCH = "repo", "main"
# Prepare example local data
local_path = Path("demo.txt")
local_path.write_text("Hello, lakeFS!")
# Upload the local file to the repo and commit
fs = LakeFSFileSystem() # will auto-discover credentials from ~/.lakectl.yaml
repo_path = f"{REPO}/{BRANCH}/{local_path.name}"
with fs.transaction(REPO, BRANCH) as tx:
fs.put(str(local_path), f"{REPO}/{tx.branch.id}/{local_path.name}")
tx.commit(message="Add demo data")
# Read back the file contents
f = fs.open(repo_path, "rt")
print(f.readline()) # prints "Hello, lakeFS!"
# Compare the sizes of local file and repo
file_info = fs.info(repo_path)
print(
f"{local_path.name}: local size: {file_info['size']}, remote size: {local_path.stat().st_size}"
)
# Get information about all files in the repo root
print(fs.ls(f"{REPO}/{BRANCH}/"))
# Delete uploaded file from the repository (and commit)
with fs.transaction(REPO, BRANCH) as tx:
fs.rm(f"{REPO}/{tx.branch.id}/{local_path.name}")
tx.commit(message="Delete demo data")
local_path.unlink()
Работает достаточно стабильно, но всё же, обращаю ещё раз внимание, что хоть авторы LakeFS и поддерживают этот проект, они рекомендуют всё же использовать их библиотеку.
Git
через lakectl local
Модуль local
из lakectl
позволяет работать с данными из LakeFS локально.
При помощи этого модуля данные из LakeFS клонируются в локальную папку, которая затем синхронизируется с удаленным репозиторием LakeFS. Это достигается тем, что в выбранную папку помешается файл метаданных, в котором указано, с какой именно веткой нужно синхронизировать папку.
Вы можете синхронизировать сколько угодно репозиториев и веток из них, единственное ограничение, что одна папка может быть привязана только к одной ветке из одного репозитория.
$ lakectl local clone lakefs://is-alpaca/experiment-1/dataset/train/ input
Successfully cloned lakefs://is-alpaca/experiment-1/dataset/train/ to ~/ml_models/is_alpaca/input
Clone Summary:
Downloaded: 250
Uploaded: 0
Removed: 0
Посмотрим синхронизированные папки:
is_alpaca$ lakectl local list
+-----------+------------------------------------------------+------------------------------------------------------------------+
| DIRECTORY | REMOTE URI | SYNCED COMMIT |
+-----------+------------------------------------------------+------------------------------------------------------------------+
| input | lakefs://is-alpaca/experiment-1/dataset/train/ | 589f87704418c6bac80c5a6fc1b52c245af347b9ad1ea8d06597e4437fae4ca3 |
+-----------+------------------------------------------------+------------------------------------------------------------------+
is_alpaca$ git add input/
is_alpaca$ git status
On branch experiment-1
Changes to be committed:
(use "git restore --staged <file>..." to unstage)
new file: input/.lakefs_ref.yaml
Changes not staged for commit:
(use "git add <file>..." to update what will be committed)
(use "git restore <file>..." to discard changes in working directory)
modified: .gitignore
А что вообще находится в этом файле:
is_alpaca$ cat input/.lakefs_ref.yaml
src: lakefs://is-alpaca/experiment-1/dataset/train
at_head: 589f87704418c6bac80c5a6fc1b52c245af347b9ad1ea8d06597e4437fae4ca3
Можем закоммитить изменения в Git:
is_alpaca$ lakectl local status input
diff 'local:///ml_models/is_alpaca/input' <--> 'lakefs://is-alpaca/589f87704418c6bac80c5a6fc1b52c245af347b9ad1ea8d06597e4437fae4ca3/dataset/train/'...
diff 'lakefs://is-alpaca/589f87704418c6bac80c5a6fc1b52c245af347b9ad1ea8d06597e4437fae4ca3/dataset/train/' <--> 'lakefs://is-alpaca/experiment-1/dataset/train/'...
╔════════╦════════╦════════════════════════════╗
║ SOURCE ║ CHANGE ║ PATH ║
╠════════╬════════╬════════════════════════════╣
║ local ║ added ║ not_alpaca/axolotl2.jpeg ║
║ local ║ added ║ not_alpaca/axolotl3.png ║
║ local ║ added ║ not_alpaca/axolotl4.jpeg ║
╚════════╩════════╩════════════════════════════╝
Теперь можно сделать коммит:
is_alpaca$ lakectl local commit input -m "add images of axolotls to the training dataset"
Getting branch: experiment-1
diff 'local:///ml_models/is_alpaca/input' <--> 'lakefs://is-alpaca/589f87704418c6bac80c5a6fc1b52c245af347b9ad1ea8d06597e4437fae4ca3/dataset/train/'...
upload not_alpaca/axolotl3.png ... done! [5.04KB in 679ms]
upload not_alpaca/axolotl2.jpeg ... done! [38.31KB in 685ms]
upload not_alpaca/axolotl4.jpeg ... done! [7.70KB in 718ms]
Sync Summary:
Downloaded: 0
Uploaded: 3
Removed: 0
Finished syncing changes. Perform commit on branch...
Commit for branch "experiment-1" completed.
ID: 0b376f01b925a075851bbaffacf104a80de04a43ed7e56054bf54c42d2c8cce6
Message: add images of axolotls to the training dataset
Timestamp: 2024-02-08 17:41:20 +0200 IST
Parents: 589f87704418c6bac80c5a6fc1b52c245af347b9ad1ea8d06597e4437fae4ca3
is_alpaca$ lakectl local init lakefs://is-alpaca/main/dataset/test/ testDataset
Location added to /is_alpaca/.gitignore
Successfully linked local directory '/is_alpaca/testDataset' with remote 'lakefs://is-alpaca/main/dataset/test/'
is_alpaca$ lakectl local list
+-------------+-------------------------------------------------+------------------------------------------------------------------+
| DIRECTORY | REMOTE URI | SYNCED COMMIT |
+-------------+-------------------------------------------------+------------------------------------------------------------------+
| input | lakefs://is-alpaca/main/dataset/train/ | 0b376f01b925a075851bbaffacf104a80de04a43ed7e56054bf54c42d2c8cce6 |
| testDataset | lakefs://is-alpaca/main/dataset/test/ | 0b376f01b925a075851bbaffacf104a80de04a43ed7e56054bf54c42d2c8cce6 |
+-------------+-------------------------------------------------+------------------------------------------------------------------+
is_alpaca$ lakectl local status testDataset
diff 'local:///ml_models/is_alpaca/testDataset' <--> 'lakefs://is-alpaca/0b376f01b925a075851bbaffacf104a80de04a43ed7e56054bf54c42d2c8cce6/dataset/test/'...
diff 'lakefs://is-alpaca/0b376f01b925a075851bbaffacf104a80de04a43ed7e56054bf54c42d2c8cce6/dataset/test/' <--> 'lakefs://is-alpaca/main/dataset/test/'...
╔════════╦════════╦════════════════════════════════╗
║ SOURCE ║ CHANGE ║ PATH ║
╠════════╬════════╬════════════════════════════════╣
║ local ║ added ║ alpaca/alpaca (1).jpg ║
║ local ║ added ║ alpaca/alpaca (10).jpg ║
. . .
. . .
. . .
║ local ║ added ║ not_alpaca/not_alpaca (9).jpg ║
╚════════╩════════╩════════════════════════════════╝
is_alpaca$ lakectl local commit testDataset -m "add is_alpaca test dataset to lakeFS"
Getting branch: experiment-1
...
Finished syncing changes. Perform commit on branch...
Commit for branch "experiment-1" completed.
ID: c8be7f4f5c13dd2e489ae85e6f747230bfde8e50f9cd9b6af20b2baebfb576cf
Message: add is_alpaca test dataset to lakeFS
Timestamp: 2024-02-10 12:31:53 +0200 IST
Parents: 0b376f01b925a075851bbaffacf104a80de04a43ed7e56054bf54c42d2c8cce6
LakeFS далеко не единственный инструмент, который решает вопрос версионирования данных, однако, как мне кажется, его подход достоин того, чтобы рассмотреть этот инструмент и познакомиться с ним.
Он не добавляет особо нагрузку на вашу уже существующую инфрастуктуру, при этом даёт все необходимые возможности.