PA
Не получается сгруппировать два сорса в один и записать в базу
Size: a a a
PA
AC
PA
PA
PA
AC
DZ
PA
PA
PA
with beam.Pipeline(options=pipeline_options) as p:
input = (p
| 'ReadInput' >> beam.io.ReadFromPubSub(topic=known_args.input_topic).with_output_types(six.binary_type)
| 'Decode' >> beam.Map(decode_message)
| 'Parse' >> beam.Map(parse_json)
)
source_pipeline_name = 'source_data'
source_data = (input
| 'etl' >> beam.Map(etl)
)
join_pipeline_name = 'ident_data'
ident_data = (
input
| 'ident' >> beam.Map(identity)
)
identify_users = (
ident_data
| 'filter not exist user' >> beam.Filter(lambda x: x['is_exist'] == 0)
| 'select additional field' >> beam.Map(write_new)
| 'write new user to db' >> beam.io.WriteToBigQuery(
known_args.output_table,
schema='ident:STRING, user_id:STRING',
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
)
)
common_key = 'ident'
pipelines_dictionary = {source_pipeline_name: source_data,
join_pipeline_name: ident_data}
test_pipeline = (pipelines_dictionary
| 'Left join' >> LeftJoin(
source_pipeline_name, source_data,
join_pipeline_name, ident_data, common_key)
| 'print' >> beam.Map(print_ln)
)
PA
PA
PA
class LeftJoin(beam.PTransform):
"""This PTransform performs a left join given source_pipeline_name, source_data,
join_pipeline_name, join_data, common_key constructors"""
def __init__(self, source_pipeline_name, source_data, join_pipeline_name, join_data, common_key):
self.join_pipeline_name = join_pipeline_name
self.source_data = source_data
self.source_pipeline_name = source_pipeline_name
self.join_data = join_data
self.common_key = common_key
def expand(self, pcolls):
def _format_as_common_key_tuple(data_dict, common_key):
return data_dict[common_key], data_dict
"""This part here below starts with a python dictionary comprehension in case you
get lost in what is happening :-)"""
return ({pipeline_name: pcoll
| 'Convert to ({0}, object) for {1}'
.format(self.common_key, pipeline_name)
>> beam.Map(_format_as_common_key_tuple, self.common_key)
for (pipeline_name, pcoll) in pcolls.items()}
| 'CoGroupByKey {0}'.format(pcolls.keys()) >> beam.CoGroupByKey()
| 'Unnest Cogrouped' >> beam.ParDo(UnnestCoGrouped(),
self.source_pipeline_name,
self.join_pipeline_name)
)
class UnnestCoGrouped(beam.DoFn):
"""This DoFn class unnests the CogroupBykey output and emits """
def process(self, input_element, source_pipeline_name, join_pipeline_name):
group_key, grouped_dict = input_element
join_dictionary = grouped_dict[join_pipeline_name]
source_dictionaries = grouped_dict[source_pipeline_name]
for source_dictionary in source_dictionaries:
try:
source_dictionary.update(join_dictionary[0])
yield source_dictionary
except IndexError: # found no join_dictionary
yield source_dictionary
class LogContents(beam.DoFn):
"""This DoFn class logs the content of that which it receives """
def process(self, input_element):
logging.info("Contents: {}".format(input_element))
logging.info("Contents type: {}".format(type(input_element)))
logging.info("Contents Access input_element['ident']: {}".format(input_element['ident']))
return
PA
PA
E
PA
PA
PA
E
java.lang.NoClassDefFoundError: com/google/common/collect/ImmutableSet