Size: a a a

2020 August 22

PA

Panchenko Andrey in Data Engineers
Ребята. Может кто-то помочь подрихтовать пайплан на апаче бим?
Не получается сгруппировать два сорса в один и записать в базу
источник

AC

Alexander Chermenin in Data Engineers
Panchenko Andrey
Ребята. Может кто-то помочь подрихтовать пайплан на апаче бим?
Не получается сгруппировать два сорса в один и записать в базу
Могу попробовать помочь. Хотелось бы увидеть код, на чём запускаете и какая конкретно ошибка?
источник

PA

Panchenko Andrey in Data Engineers
Alexander Chermenin
Могу попробовать помочь. Хотелось бы увидеть код, на чём запускаете и какая конкретно ошибка?
Могу в ЛС?
источник

PA

Panchenko Andrey in Data Engineers
Код на питоне
источник

PA

Panchenko Andrey in Data Engineers
Ошибка в том что ничего не возвращает
источник

AC

Alexander Chermenin in Data Engineers
Panchenko Andrey
Могу в ЛС?
Конечно
источник

DZ

Dmitry Zuev in Data Engineers
Panchenko Andrey
Могу в ЛС?
А смысл?
источник

PA

Panchenko Andrey in Data Engineers
Ок
источник

PA

Panchenko Andrey in Data Engineers
Переслано от Panchenko Andrey
Приветствую
источник

PA

Panchenko Andrey in Data Engineers
Переслано от Panchenko Andrey
  with beam.Pipeline(options=pipeline_options) as p:

       input = (p
            | 'ReadInput' >> beam.io.ReadFromPubSub(topic=known_args.input_topic).with_output_types(six.binary_type)
            | 'Decode' >> beam.Map(decode_message)
            | 'Parse' >> beam.Map(parse_json)
           )

       source_pipeline_name = 'source_data'
       source_data = (input
            | 'etl' >> beam.Map(etl)
                       )

       join_pipeline_name = 'ident_data'
       ident_data = (
           input
           | 'ident' >> beam.Map(identity)

       )

       identify_users = (
           ident_data
           | 'filter not exist user' >> beam.Filter(lambda x: x['is_exist'] == 0)
           | 'select additional field' >> beam.Map(write_new)
           | 'write new user to db' >> beam.io.WriteToBigQuery(
               known_args.output_table,
               schema='ident:STRING, user_id:STRING',
               write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
            )

       )

       common_key = 'ident'
       pipelines_dictionary = {source_pipeline_name: source_data,
                               join_pipeline_name: ident_data}
       test_pipeline = (pipelines_dictionary
                        | 'Left join' >> LeftJoin(
                           source_pipeline_name, source_data,
                           join_pipeline_name, ident_data, common_key)


                        | 'print' >> beam.Map(print_ln)

                        )
источник

PA

Panchenko Andrey in Data Engineers
Переслано от Panchenko Andrey
на принт ничего не отдает
источник

PA

Panchenko Andrey in Data Engineers
Переслано от Panchenko Andrey
вот используемые классы
источник

PA

Panchenko Andrey in Data Engineers
Переслано от Panchenko Andrey
 class LeftJoin(beam.PTransform):
   """This PTransform performs a left join given source_pipeline_name, source_data,
    join_pipeline_name, join_data, common_key constructors"""

   def __init__(self, source_pipeline_name, source_data, join_pipeline_name, join_data, common_key):
       self.join_pipeline_name = join_pipeline_name
       self.source_data = source_data
       self.source_pipeline_name = source_pipeline_name
       self.join_data = join_data
       self.common_key = common_key

   def expand(self, pcolls):
       def _format_as_common_key_tuple(data_dict, common_key):
           return data_dict[common_key], data_dict

       """This part here below starts with a python dictionary comprehension in case you
       get lost in what is happening :-)"""
       return ({pipeline_name: pcoll
                               | 'Convert to ({0}, object) for {1}'
               .format(self.common_key, pipeline_name)
                               >> beam.Map(_format_as_common_key_tuple, self.common_key)
                for (pipeline_name, pcoll) in pcolls.items()}
               | 'CoGroupByKey {0}'.format(pcolls.keys()) >> beam.CoGroupByKey()
               | 'Unnest Cogrouped' >> beam.ParDo(UnnestCoGrouped(),
                                                  self.source_pipeline_name,
                                                  self.join_pipeline_name)
               )


class UnnestCoGrouped(beam.DoFn):
   """This DoFn class unnests the CogroupBykey output and emits """

   def process(self, input_element, source_pipeline_name, join_pipeline_name):
       group_key, grouped_dict = input_element
       join_dictionary = grouped_dict[join_pipeline_name]
       source_dictionaries = grouped_dict[source_pipeline_name]
       for source_dictionary in source_dictionaries:
           try:
               source_dictionary.update(join_dictionary[0])
               yield source_dictionary
           except IndexError:  # found no join_dictionary
               yield source_dictionary


class LogContents(beam.DoFn):
   """This DoFn class logs the content of that which it receives """

   def process(self, input_element):
       logging.info("Contents: {}".format(input_element))
       logging.info("Contents type: {}".format(type(input_element)))
       logging.info("Contents Access input_element['ident']: {}".format(input_element['ident']))
       return
источник

PA

Panchenko Andrey in Data Engineers
Переслано от Panchenko Andrey
Или может есть просто готовый кусок кода для группиррвки двух сорсов по ключу и для записи в базу
источник

PA

Panchenko Andrey in Data Engineers
Переслано от Panchenko Andrey
А то мучаюсь два дня уже
источник

E

El-Yaz in Data Engineers
Panchenko Andrey
Переслано от Panchenko Andrey
А то мучаюсь два дня уже
Совет как решить проблему не дам, а вот совет как код выдавать - дам: не кидай это полотно на 3 экрана в чат плз, юзай gist или pastebin
источник

PA

Panchenko Andrey in Data Engineers
Спс
источник
2020 August 23

PA

Panchenko Andrey in Data Engineers
Ребята привет. помогите плизз ничего у меня так и не вышло с мерджем двух словарей
https://pastebin.com/zTvpBt2r
источник

PA

Panchenko Andrey in Data Engineers
Apache beam, python
источник

E

El-Yaz in Data Engineers
Запускаю elastic search sink connector для kafka, указал оба .properties, с шела не стартует скрипт: java.lang.NoClassDefFoundError: com/google/common/collect/ImmutableSet
коннектор скачал с https://www.confluent.io/hub/confluentinc/kafka-connect-elasticsearch напрямую. гугл подсказал добавить гуаву к проекту: https://stackoverflow.com/a/18699682/5151861
добавил с dependencies, ошибка не исчезла и потом осознал, что раню с шела, а не мавеном, поэтому чуда и не случается.
Возможно, была у кого такая проблема и знает как ее решать?
источник