diff --git a/beamAst.py b/beamAst.py new file mode 100644 index 0000000000000000000000000000000000000000..c566dc8f0211c73b8363efd2757d07bdab7b809f --- /dev/null +++ b/beamAst.py @@ -0,0 +1,19 @@ +import apache_beam as beam + +table_schema = 'column1:STRING, column2:STRING' +table_spec = "[project_id]:[dataset_id].[table_id]" + + +with beam.Pipeline() as pipeline: + ip = ( pipeline + |beam.io.ReadFromText("gs://path-to-file",skip_header_lines=True) + |beam.Map(lambda x : x.split(",")) + |beam.Map(lambda y : {'column1': y[0],'column2': y[1]}) + |beam.io.WriteToBigQuery( + table_spec, + schema=table_schema, + write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE, + create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED) + + + )