LakeFS

Тимур Салимов — Techlead DS

Введение

LakeFS объединяет лучшие практики разработки программного обеспечения и применяет их к работе с данными.

LakeFS обеспечивает контроль версий над озером данных и использует семантику, подобную git, для фиксации версий данных и доступа к ним. Если вы знаете git, с LakeFS вы будете чувствовать себя привычно.

С помощью LakeFS вы можете использовать в своем озере данных такие концепции, как:

  • branch – для создания изолированной версии данных;
  • commit – для создания воспроизводимого снимка данных;
  • merge – чтобы объединять ваши изменения в одно атомарное состояние.

Как может быть реализовано версионирование данных?

Подход к управлению версиями №1: полное дублирование

У вас есть набор данных, и вы хотите увидеть, как он меняется с течением времени? Один из вариантов — сохранять полную копию всех данных при каждом изменении одного файла. Лучше всего это работает для небольших наборов данных.

Подход к управлению версиями № 2: метаданные «valid_from/to»

Более экономичный и инкрементный подход к управлению версиями заключается в добавлении и поддержании двух полей метаданных в табличном наборе данных, часто называемых valid_from и valid_to.

При обновлении записи в этом наборе данных мы никогда не перезаписываем существующую запись. Вместо этого мы добавляем новые записи и обновляем поле valid_to до текущей отметки времени для любой записи, которая была бы перезаписана.

Помимо того, что вы можете реализовать это в своих собственных ETL пайплайнах, это также, в частности, подход, который использует SQL Server для временных таблиц и то, что использует dbt для создания снимков.

Подход к управлению версиями №3: Контроль версий данных как объект первого класса

Если первые два подхода можно резюмировать так: «Давайте добавим немного контроля версий к уже имеющимся у меня данным», то теперь пришло время совершенно иного подхода.

А что если относиться к управлению версиями данных как к объекту первого класса? Как к базовому и неотъемлемому свойству любых данных?

Чтобы сделать это возможным – как видно из ограничений вышеупомянутых подходов – необходимо, чтобы инструмент:

  • Минимизировал объем хранилища при управлении версиями данных. Это означает, что не следует создавать копии объектов данных, которые остаются неизменными между версиями.
  • Предоставлял базовый набор операций, которые позволяют напрямую взаимодействовать с версиями. Такие как: «создать версию», «удалить версию», «сравнить две версии» и т.д.
  • Одинаково работал с любым масштабом данных, форматом данных, а также как со структурированными, так и с неструктурированными данными.

Версионирование данных в LakeFS

LakeFS решает эти задачи становясь некоторым уровнем абстракции над физическим объектным хранилищем данных.

Как только, вы настраиваете LakeFS над любым объектным хранилищем (либо же над файловой системой для тестов), вы больше не используете физические пути до объектов, LakeFS становится вашей файловой системой (FS). При этом LakeFS ничего не хранит данные на своей стороне, а отдаёт это внешнему хранилищу, таким образом выполняя функцию только интерфейса.

Диапазоны и метадиапазоны

  • Формат файла SSTable(Graveler File)
  • Тогда создание коммита:

LakeFS: Где мои данные?!

Как сказано в блоге LakeFS:

These are lakeFS internals and you do not need to know any of the details below in order to use lakeFS at any level.

На из-за этого мне ещё больше захотелось об этом рассказать. :)

Для инженера данных иногда может показаться, что LakeFS изо всех сил старается спрятать ваши данные. Действительно, один очень распространенный вопрос канала LakeFS #help в Slack — это вежливая вариация на тему «где мои данные?!».

LakeFS и правда выполняет роль абстракции поверх хранилища данных и делает это настолько хорошо, что найти сырые данные иногда становится сложно..

Попробуем найти файл main/allstar_games_stat.csv

А что ещё есть в папке data?..

А можно как-то проще это посмотреть?

Установка

Есть несколько вариантов установки:

  • Скачать со страницы релизов архив, там будет один бинарник для lakefs и один для их консольной утилиты lakectl.
  • Использовать docker, ибо есть официальный образ – treeverse/lakefs, через который сразу можно использовать и lakefsи lakectl.

Если мы говорим про production сервис, выделенный под lakefs, то лучше использовать вариант с архивом, чтоб не добавлять абстракцию там, где она не нужна.

Для всего остального лучше использовать docker.

Базовый вариант, чтобы просто проверить работу

Таким образом LakeFS запустится локально, без использования внешнего объектного хранилища, и без БД, в которой должны храниться наши метаданные. Это отличный старт, чтобы просто познакомиться с инструментом, но не рекомендую так в production делать. :)

docker run --name lakefs --rm \
             --publish 8000:8000 \
             treeverse/lakefs:latest \
             run --local-settings

Полноценный docker-compose

version: "3.5"
name: lakefs-example
services:
  lakefs:
    image: treeverse/lakefs:1
    pull_policy: always
    ports:
      - "8000:8000"
    depends_on:
      postgres:
        condition: service_healthy
      minio-setup:
        condition: service_completed_successfully
    environment:
      - LAKEFS_BLOCKSTORE_TYPE=s3
      - LAKEFS_BLOCKSTORE_S3_FORCE_PATH_STYLE=true
      - LAKEFS_BLOCKSTORE_S3_ENDPOINT=http://minio:9000
      - LAKEFS_BLOCKSTORE_S3_CREDENTIALS_ACCESS_KEY_ID=minioadmin
      - LAKEFS_BLOCKSTORE_S3_CREDENTIALS_SECRET_ACCESS_KEY=minioadmin
      - LAKEFS_AUTH_ENCRYPT_SECRET_KEY=some random secret string
      - LAKEFS_LOGGING_LEVEL=INFO
      - LAKEFS_STATS_ENABLED=${LAKEFS_STATS_ENABLED:-1}
      - LAKECTL_CREDENTIALS_ACCESS_KEY_ID=AKIAIOSFOLKFSSAMPLES
      - LAKECTL_CREDENTIALS_SECRET_ACCESS_KEY=wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY
      - LAKECTL_SERVER_ENDPOINT_URL=http://localhost:8000
      - LAKEFS_DATABASE_TYPE=postgres
      - LAKEFS_DATABASE_POSTGRES_CONNECTION_STRING=postgres://postgres:postgres@postgres/postgres?sslmode=disable
    entrypoint: ["/bin/sh", "-c"]
    command:
        - |
          lakefs setup --user-name everything-bagel --access-key-id "$$LAKECTL_CREDENTIALS_ACCESS_KEY_ID" --secret-access-key "$$LAKECTL_CREDENTIALS_SECRET_ACCESS_KEY" || true
          lakefs run &
          echo "---- Creating repository ----"
          wait-for -t 60 lakefs:8000 -- curl -u "$$LAKECTL_CREDENTIALS_ACCESS_KEY_ID":"$$LAKECTL_CREDENTIALS_SECRET_ACCESS_KEY" -X POST -H "Content-Type: application/json" -d '{ "name": "quickstart", "storage_namespace": "s3://quickstart", "default_branch": "main", "sample_data": true }' http://localhost:8000/api/v1/repositories || true
          # wait-for -t 60 lakefs:8000 -- lakectl repo create lakefs://example s3://example || true
          echo ""
          wait-for -t 60 minio:9000 && echo '------------------------------------------------

                MinIO admin:   http://127.0.0.1:9001/
                
                               Username : minioadmin
                               Password : minioadmin
                '
          echo "------------------------------------------------"
          echo ""
          echo "lakeFS Web UI: http://127.0.0.1:8000/      >(._.)<"
          echo "                                             (  )_ "
          echo ""
          echo "                Access Key ID    : $$LAKECTL_CREDENTIALS_ACCESS_KEY_ID"
          echo "                Secret Access Key: $$LAKECTL_CREDENTIALS_SECRET_ACCESS_KEY"
          echo ""
          echo "-------- Let's go and have axolotl fun! --------"
          echo ""
          wait
  minio-setup:
    image: minio/mc:RELEASE.2023-05-18T16-59-00Z
    environment:
        - MC_HOST_lakefs=http://minioadmin:minioadmin@minio:9000
    depends_on:
      - minio
    volumes:
      - ./data:/data
    entrypoint: ["/bin/sh", "-c"]
    command:
        - |
          mc mb lakefs/quickstart lakefs/example lakefs/sample-data
          exit 0

  minio:
    image: minio/minio:RELEASE.2023-05-18T00-05-36Z
    ports:
      - "9000:9000"
      - "9001:9001"
    entrypoint: ["minio", "server", "/data", "--console-address", ":9001"]

  postgres:
    image: postgres:14
    environment:
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: postgres
    healthcheck:
      test: ["CMD", "pg_isready", "-U", "postgres"]
      interval: 10s
      retries: 5
      start_period: 5s
    restart: always 

networks:
  default:
    name: bagel

lakectl

Для полного списка команд можно посмотреть документацию, а здесь посмотрим на то, чем вы будете пользоваться 99% времени.

Конфигурация

Список доступных репозиториев

Создание репозитория

Коммит

Merge

Интеграция с Python

Инструмент продолжает активно развиваться, поэтому можно столкнуться с тем, что есть старые библиотеки на python, которые выполнили интеграцию с lakefs (в том числе от самих создателей). Однако, работа с lakefs мало чем отличается от работы с любым объектным хранилищем, так что всегда можно использовать любый понравившийся вам S3 клиент.

Например, самый стандартный вариант это библиотека boto3.

import boto3
s3 = boto3.client('s3',
    endpoint_url='https://lakefs.example.com',
    aws_access_key_id='AKIAIOSFODNN7EXAMPLE',
    aws_secret_access_key='wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY')

with open('/local/path/to/file_0', 'rb') as f:
    s3.put_object(Body=f, Bucket='example-repo', Key='main/example-file.parquet')

list_resp = s3.list_objects_v2(Bucket='example-repo', Prefix='c7a632d74f/example-prefix')
for obj in list_resp['Contents']:
    print(obj['Key'])

s3.head_object(Bucket='example-repo', Key='main/example-file.parquet')

Однако, помимо этого, авторами LakeFS сейчас поддерживаются 2 библиотеки для Python, реализующих слегка разный уровень абстракции:

  • lakefs – разработка авторов.
  • lakefs-spec – разработка сообщества, поддерживаемая авторами.

Рассмотрим их чуток подробнее.

lakefs

В этой библиотеке реализованы все необходимые операции, которые чуток проще использовать, чем писать обращения через boto3, в остальном, полностью похожий подход.

from lakefs.client import Client

clt = Client(
    host="http://localhost:8000",
    username="AKIAIOSFODNN7EXAMPLE",
    password="wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
)

repo = lakefs.Repository("example-repo", client=clt).create(storage_namespace="s3://storage-bucket/repos/example-repo")

branch1 = lakefs.repository("example-repo").branch("experiment1").create(source_reference_id="main")

obj = branch1.object(path="text/sample_data.txt").upload(content_type="text/plain", data="This is my object data")
print(obj.stats())

print(obj.reader(mode='r').read())

obj = branch1.object(path="raw/file1.data").upload(data=b"Hello Object World", pre_sign=True)
print(obj.stats())

for diff in branch1.uncommitted():
    print(diff)

ref = branch1.commit(message='Add some data!', metadata={'using': 'python_sdk'})
print(ref.get_commit())

main = repo.branch("main")
for diff in main.diff(other_ref=branch1):
    print(diff)

res = branch1.merge_into(main)
print(res)

При необходимости, можно изучить документацию.

lakefs-spec

Тут решили реализовать слегка иной подход, использующий filesystem_spec для Python. Таким образом, скрывая полностью работу с удаленным объектным хранилищем за простой работой с файловой системой.

from pathlib import Path
from lakefs_spec import LakeFSFileSystem

REPO, BRANCH = "repo", "main"

# Prepare example local data
local_path = Path("demo.txt")
local_path.write_text("Hello, lakeFS!")

# Upload the local file to the repo and commit
fs = LakeFSFileSystem()  # will auto-discover credentials from ~/.lakectl.yaml
repo_path = f"{REPO}/{BRANCH}/{local_path.name}"
with fs.transaction(REPO, BRANCH) as tx:
    fs.put(str(local_path), f"{REPO}/{tx.branch.id}/{local_path.name}")
    tx.commit(message="Add demo data")

# Read back the file contents
f = fs.open(repo_path, "rt")
print(f.readline())  # prints "Hello, lakeFS!"

# Compare the sizes of local file and repo
file_info = fs.info(repo_path)
print(
    f"{local_path.name}: local size: {file_info['size']}, remote size: {local_path.stat().st_size}"
)

# Get information about all files in the repo root
print(fs.ls(f"{REPO}/{BRANCH}/"))

# Delete uploaded file from the repository (and commit)
with fs.transaction(REPO, BRANCH) as tx:
    fs.rm(f"{REPO}/{tx.branch.id}/{local_path.name}")
    tx.commit(message="Delete demo data")

local_path.unlink()

Работает достаточно стабильно, но всё же, обращаю ещё раз внимание, что хоть авторы LakeFS и поддерживают этот проект, они рекомендуют всё же использовать их библиотеку.

Интеграция с Git через lakectl local

Модуль local из lakectl позволяет работать с данными из LakeFS локально.

При помощи этого модуля данные из LakeFS клонируются в локальную папку, которая затем синхронизируется с удаленным репозиторием LakeFS. Это достигается тем, что в выбранную папку помешается файл метаданных, в котором указано, с какой именно веткой нужно синхронизировать папку.

Вы можете синхронизировать сколько угодно репозиториев и веток из них, единственное ограничение, что одна папка может быть привязана только к одной ветке из одного репозитория.

$ lakectl local clone lakefs://is-alpaca/experiment-1/dataset/train/ input

Successfully cloned lakefs://is-alpaca/experiment-1/dataset/train/ to ~/ml_models/is_alpaca/input

Clone Summary:

Downloaded: 250
Uploaded: 0
Removed: 0

Посмотрим синхронизированные папки:

is_alpaca$ lakectl local list                 
+-----------+------------------------------------------------+------------------------------------------------------------------+
| DIRECTORY | REMOTE URI                                     | SYNCED COMMIT                                                    |
+-----------+------------------------------------------------+------------------------------------------------------------------+
| input     | lakefs://is-alpaca/experiment-1/dataset/train/ | 589f87704418c6bac80c5a6fc1b52c245af347b9ad1ea8d06597e4437fae4ca3 |
+-----------+------------------------------------------------+------------------------------------------------------------------+

Работа с Git

is_alpaca$ git add input/
is_alpaca$ git status 
On branch experiment-1
Changes to be committed:
  (use "git restore --staged <file>..." to unstage)
    new file:   input/.lakefs_ref.yaml

Changes not staged for commit:
  (use "git add <file>..." to update what will be committed)
  (use "git restore <file>..." to discard changes in working directory)
    modified:   .gitignore

А что вообще находится в этом файле:

is_alpaca$ cat input/.lakefs_ref.yaml

src: lakefs://is-alpaca/experiment-1/dataset/train
at_head: 589f87704418c6bac80c5a6fc1b52c245af347b9ad1ea8d06597e4437fae4ca3

Можем закоммитить изменения в Git:

is_alpaca$ git commit -m "added is_alpaca dataset" 

Локально добавим файлы и синхронизируем

is_alpaca$ lakectl local status input 
diff 'local:///ml_models/is_alpaca/input' <--> 'lakefs://is-alpaca/589f87704418c6bac80c5a6fc1b52c245af347b9ad1ea8d06597e4437fae4ca3/dataset/train/'...
diff 'lakefs://is-alpaca/589f87704418c6bac80c5a6fc1b52c245af347b9ad1ea8d06597e4437fae4ca3/dataset/train/' <--> 'lakefs://is-alpaca/experiment-1/dataset/train/'...

╔════════╦════════╦════════════════════════════╗
 SOURCE ║ CHANGE ║ PATH                       ║
╠════════╬════════╬════════════════════════════╣
 local  ║ added  ║ not_alpaca/axolotl2.jpeg   ║
 local  ║ added  ║ not_alpaca/axolotl3.png    ║
 local  ║ added  ║ not_alpaca/axolotl4.jpeg   ║
╚════════╩════════╩════════════════════════════╝

Теперь можно сделать коммит:

is_alpaca$ lakectl local commit input -m "add images of axolotls to the training dataset"

Getting branch: experiment-1

diff 'local:///ml_models/is_alpaca/input' <--> 'lakefs://is-alpaca/589f87704418c6bac80c5a6fc1b52c245af347b9ad1ea8d06597e4437fae4ca3/dataset/train/'...
upload not_alpaca/axolotl3.png              ... done! [5.04KB in 679ms]
upload not_alpaca/axolotl2.jpeg             ... done! [38.31KB in 685ms]
upload not_alpaca/axolotl4.jpeg             ... done! [7.70KB in 718ms]

Sync Summary:

Downloaded: 0
Uploaded: 3
Removed: 0

Finished syncing changes. Perform commit on branch...
Commit for branch "experiment-1" completed.

ID: 0b376f01b925a075851bbaffacf104a80de04a43ed7e56054bf54c42d2c8cce6
Message: add images of axolotls to the training dataset
Timestamp: 2024-02-08 17:41:20 +0200 IST
Parents: 589f87704418c6bac80c5a6fc1b52c245af347b9ad1ea8d06597e4437fae4ca3

Создадим новый датасет и затем опубликуем его в LakeFS

is_alpaca$ lakectl local init lakefs://is-alpaca/main/dataset/test/ testDataset 
Location added to /is_alpaca/.gitignore
Successfully linked local directory '/is_alpaca/testDataset' with remote 'lakefs://is-alpaca/main/dataset/test/'
is_alpaca$ lakectl local list                                                           
+-------------+-------------------------------------------------+------------------------------------------------------------------+
| DIRECTORY   | REMOTE URI                                      | SYNCED COMMIT                                                    |
+-------------+-------------------------------------------------+------------------------------------------------------------------+
| input       | lakefs://is-alpaca/main/dataset/train/          | 0b376f01b925a075851bbaffacf104a80de04a43ed7e56054bf54c42d2c8cce6 |
| testDataset | lakefs://is-alpaca/main/dataset/test/           | 0b376f01b925a075851bbaffacf104a80de04a43ed7e56054bf54c42d2c8cce6 |
+-------------+-------------------------------------------------+------------------------------------------------------------------+
is_alpaca$ lakectl local status testDataset 

diff 'local:///ml_models/is_alpaca/testDataset' <--> 'lakefs://is-alpaca/0b376f01b925a075851bbaffacf104a80de04a43ed7e56054bf54c42d2c8cce6/dataset/test/'...
diff 'lakefs://is-alpaca/0b376f01b925a075851bbaffacf104a80de04a43ed7e56054bf54c42d2c8cce6/dataset/test/' <--> 'lakefs://is-alpaca/main/dataset/test/'...

╔════════╦════════╦════════════════════════════════╗
 SOURCE ║ CHANGE ║ PATH                           ║
╠════════╬════════╬════════════════════════════════╣
 local  ║ added  ║ alpaca/alpaca (1).jpg
 local  ║ added  ║ alpaca/alpaca (10).jpg
    .        .                  .
    .        .                  .
    .        .                  .
 local  ║ added  ║ not_alpaca/not_alpaca (9).jpg
╚════════╩════════╩════════════════════════════════╝
is_alpaca$ lakectl local commit testDataset -m "add is_alpaca test dataset to lakeFS" 

Getting branch: experiment-1

...

Finished syncing changes. Perform commit on branch...
Commit for branch "experiment-1" completed.

ID: c8be7f4f5c13dd2e489ae85e6f747230bfde8e50f9cd9b6af20b2baebfb576cf
Message: add is_alpaca test dataset to lakeFS
Timestamp: 2024-02-10 12:31:53 +0200 IST
Parents: 0b376f01b925a075851bbaffacf104a80de04a43ed7e56054bf54c42d2c8cce6

Выводы

LakeFS далеко не единственный инструмент, который решает вопрос версионирования данных, однако, как мне кажется, его подход достоин того, чтобы рассмотреть этот инструмент и познакомиться с ним.

Он не добавляет особо нагрузку на вашу уже существующую инфрастуктуру, при этом даёт все необходимые возможности.