Size: a a a

2020 August 12

SS

Sergey Sheremeta in Data Engineers
ну вот показал гобблин, что данные невалидны - они же не в карантин попадают, все равно куда-то дальше по пайплайнам ушли, в расчет витрин/МЛ попали?
как сейчас вычистить эти битые данные?
источник

SS

Sergey Sheremeta in Data Engineers
Andrey Smirnov
просто "наспросить", а почему паркет, я всегда считал что для такого лучше авро.
наверное, тут следует написать что были проведены комплексные сравнения и бенчмаркинг... но нет.
для унификации: на других слоях DataLake используется delta.io формат (тот что от Databricks), а это по сути и есть паркет с метаданными сбоку

ну и ежели я не собираюсь парсить на лету эти данные из Кафки, а хочу лишь свалить их и по возможности сжать - чем мне поможет Авро? схема самих данных мне здесь неважна. а схема датафрейма из Кафки простая и неизменная: key, value, timestamp, offset, partition
источник

AS

Andrey Smirnov in Data Engineers
Sergey Sheremeta
наверное, тут следует написать что были проведены комплексные сравнения и бенчмаркинг... но нет.
для унификации: на других слоях DataLake используется delta.io формат (тот что от Databricks), а это по сути и есть паркет с метаданными сбоку

ну и ежели я не собираюсь парсить на лету эти данные из Кафки, а хочу лишь свалить их и по возможности сжать - чем мне поможет Авро? схема самих данных мне здесь неважна. а схема датафрейма из Кафки простая и неизменная: key, value, timestamp, offset, partition
так ты предотвратишь попадание данных которые не соответствуют схеме
источник

N

Nikita Blagodarnyy in Data Engineers
так дельта енфорсит схему при записи
источник

SS

Sergey Sheremeta in Data Engineers
так лучше получить хоть что-то, чем ничего?
источник

SS

Sergey Sheremeta in Data Engineers
Nikita Blagodarnyy
так дельта енфорсит схему при записи
у меня приземленный поток Кафки попадает в Дельту со схемой:
case class KafkaStage(
   kafka_key: String,
   kafka_value: String,
   kafka_topic: String,
   kafka_partition: Int,
   kafka_offset: Long,
   kafka_ts: Timestamp,
   kafka_dt: Date,
   load_ts: Timestamp)
источник

SS

Sergey Sheremeta in Data Engineers
тут все нормально, схема неизменна,
проблемы когда я хочу вычитать из этой delta/parquet-таблицы и применить
   df.as[KafkaStage]
     .withColumn("val", from_json(col("kafka_value"), SilverTableModel.schema))
источник

SS

Sergey Sheremeta in Data Engineers
вот работало-работало это, а потом узнаю что теперь json'ы в kafka_val чуточку изменились, либо я в трансформациях допустил ошибку

CaseClass/схему я поменял, перекомпилирова, передеплоил джоб.

но где-то в витринах у меня теперь некорректные данные (где-то атрибут пропущен или таймзону я не обработал корректно)
как бы мне сейчас эти данные выявить и обновить на корректные?
источник

N

Nikita Blagodarnyy in Data Engineers
выявить по id батча/offset, обновить-перезачитать с новой схемой и перемерджить
источник

SS

Sergey Sheremeta in Data Engineers
казалось бы причем тут ДатаВольт?
источник

SS

Sergey Sheremeta in Data Engineers
(сейчас-то вброс более лутший?)
источник

N

Nikita Blagodarnyy in Data Engineers
ну а какая разница как они лежат? если 1 wide table принцип вроде тот же самый
источник

DK

Daniyar Kaliyev in Data Engineers
Sergey Sheremeta
подскажите, какие существуют "наилутшие" практики в перепроцессинге сырых данных в DataLake?
вот валится мне из Кафки поток json'ов - я его приземляю в parquet как текстовый столбец через spark structured streaming (это как говорят нынче "бронза").
другим SSS-джобом я парсю "бронзу" согласно схеме (определенной контрактом с поставщиком данных).
в какой-то момент выясняется, что схема изменилась (вот неожиданность-то!!!). либо я где-то напортачил в трансформациях...

и вот я судорожно поправил код джоба, передеплоил его! а что сейчас сделать с некорректными данными от прошлой версии джоба?
а парке файлы это ведь просто с файлы типа csv, а вы hive на него травите или друид? обычно видел в европах люди используют schema registry и пишут в кафку авро, продюсеры если меняют схему, то консьюмеры не падают, просто продолжают работать, только берут те данные которые были в их схеме до тех пор пока консьюмеры не переключат на новую схему, а консьюмеры по факту это джоба которая перевод с авро в парке
источник

ПБ

Повелитель Бури... in Data Engineers
Anton Zadorozhniy
clusterID можно вроде посмотреть в current/VERSION, сравните их между неймнодой и датанодой
источник

АР

Андрей Романов... in Data Engineers
проверьте значения в конфиге параметров dfs.namenode.name.dir и dfs.datanode.data.dir
источник

АР

Андрей Романов... in Data Engineers
это папки с метаданными для nn и dn
источник

АР

Андрей Романов... in Data Engineers
они и будут HADOOP_FILE_SYSTEM
источник

АР

Андрей Романов... in Data Engineers
если их не будет, то надо смотреть папку /tmp/hadoop-hdfs
источник

АР

Андрей Романов... in Data Engineers
и там уже папки namenode, или datanode
источник

S

Stanislav in Data Engineers
Sergey Sheremeta
ну вот показал гобблин, что данные невалидны - они же не в карантин попадают, все равно куда-то дальше по пайплайнам ушли, в расчет витрин/МЛ попали?
как сейчас вычистить эти битые данные?
Почему не в карантин? Данные ж невалидны
источник