Scheduling load from BigQuery to MongoDB using apache-airflow ( cloud-composer)












0















I am trying to set up data pipelines that move data from GCS to BigQuery , perform certain tasks/processing and load it to a MongoDB cluster ( all set up in python using DAGs). I have been able to achieve this up until the load to MongoDB. Are there any existing airflow operators that can do this? If not, is it possible to create a custom code using the mongoDB hooks provided in airflow?



Thanks,
GT



EDIT 1



I used the MongoHook and source code from BigQueryGetDataOperator (Code snippet below). My problem now is that I need to make this work for 10++ million rows and when I increase the max_results='100' default in BigQueryGetDataOperator I get an error :



sqlalchemy.exc.InvalidRequestError: This Session's transaction has been rolled back due to a previous exception during flush. To begin a new transaction with this Session, first issue Session.rollback(). Original exception was: (_mysql_exceptions.DataError) (1406, "Data too long for column 'value' at row 1")



I know I should be pushing data into XCom inchunks but am not sure if that can be really done. Is there a standard way to parse large amounts of data in Xcom? Any other alternative of achieving this using Airflow would also be helpfu. The only thing I can think of is writing the data to GCS, loading into MongoDB and then deleting the GCS file.



#-------- COPY PASTED BigQueryGetDataOperator SECTION: START --------------
'''Source: https://airflow.readthedocs.io/en/stable/_modules/airflow/contrib/operators/bigquery_get_data.html#BigQueryGetDataOperator '''

from airflow.contrib.hooks.bigquery_hook import BigQueryHook
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults


class BigQueryGetDataOperator(BaseOperator):
template_fields = ('dataset_id', 'table_id', 'max_results')
ui_color = '#e4f0e8'

@apply_defaults
def __init__(self,
dataset_id,
table_id,
max_results='100',
selected_fields=None,
bigquery_conn_id='bigquery_default',
delegate_to=None,
*args,
**kwargs):
super(BigQueryGetDataOperator, self).__init__(*args, **kwargs)
self.dataset_id = dataset_id
self.table_id = table_id
self.max_results = max_results
self.selected_fields = selected_fields
self.bigquery_conn_id = bigquery_conn_id
self.delegate_to = delegate_to

def execute(self, context):
self.log.info('Fetching Data from:')
self.log.info('Dataset: %s ; Table: %s ; Max Results: %s',
self.dataset_id, self.table_id, self.max_results)

hook = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id,
delegate_to=self.delegate_to)

conn = hook.get_conn()
cursor = conn.cursor()
response = cursor.get_tabledata(dataset_id=self.dataset_id,
table_id=self.table_id,
max_results=self.max_results,
selected_fields=self.selected_fields)

self.log.info('Total Extracted rows: %s', response['totalRows'])
rows = response['rows']
return rows
# Below lines were commented as I did not want a list but a json
# table_data =
# for dict_row in rows:
# single_row =
# for fields in dict_row['f']:
# single_row.append(fields['v'])
# table_data.append(single_row)

# return table_data
#----------------------- COPY PASTED SECTION: END ----------------

from airflow import models
from airflow.operators.python_operator import PythonOperator
from airflow.utils import trigger_rule
from airflow.contrib.operators import gcs_to_bq
from airflow.contrib.operators import bigquery_to_gcs
from airflow.contrib.operators import bigquery_operator
from airflow.contrib.operators import bigquery_get_data
from airflow.contrib.operators import MongoHook
def get_dlist(**kwargs):
import logging as log
#Import pymongo
from pymongo import MongoClient
#Pull the data saved in XCom
value = kwargs.get('task_instance').xcom_pull(task_ids='get_data_in_list_from_bq')

header = ['V1','V2']
data=
for rows in value:
onerow={}
for i,f in zip(range(len(rows['f'])),rows['f']):
onerow[header[i]] = f['v']
data.append(onerow)
log.info("Pulled...")
log.info(data)
log.info("Pushing into mongodb...")
client = MongoClient(localhost:27017)
db = client.test
collection = db.testingbq2mongo
collection.insert(data)
log.info("Written to mongoDB...")
client.close()

default_dag_args = {
# Setting start date as yesterday starts the DAG immediately when it is
# detected in the Cloud Storage bucket.
'start_date':yesterday,
# To email on failure or retry set 'email' arg to your email and enable
# emailing here.
'email_on_failure': False,
'email_on_retry': False,
# If a task fails, retry it once after waiting at least 5 minutes
'retries': 0,
#'retry_delay': datetime.timedelta(minutes=5),
'project_id': 'data-rubrics'
}

try:
# [START composer_quickstart_schedule]
with models.DAG(
'composer_testing00',
# Continue to run DAG once per day
schedule_interval=datetime.timedelta(days=1),
default_args=default_dag_args) as dag:
# [END composer_quickstart_schedule]

data_list = bigquery_get_data.BigQueryGetDataOperator(
task_id='get_data_in_list_from_bq',
dataset_id='testcomposer', # Name of the dataset which contains the table ( a BQ terminology)
table_id='summarized_sample_T1' # Name of the BQ table you want to push into MongoDB
)

op_push2mongo = PythonOperator(task_id='Push_to_MongoDB', python_callable=get_dlist, provide_context=True)
data_list >> op_push2mongo
except Exception as e:
raise(e)


EDIT 2



    #-------- COPY PASTED BigQueryGetDataOperator SECTION: START --------------
'''Source: https://airflow.readthedocs.io/en/stable/_modules/airflow/contrib/operators/bigquery_get_data.html#BigQueryGetDataOperator '''

from airflow.contrib.hooks.bigquery_hook import BigQueryHook
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults


class BigQueryGetDataOperator(BaseOperator):
template_fields = ('dataset_id', 'table_id', 'max_results')
ui_color = '#e4f0e8'

@apply_defaults
def __init__(self,
dataset_id,
table_id,
max_results='100',
selected_fields=None,
bigquery_conn_id='bigquery_default',
delegate_to=None,
*args,
**kwargs):
super(BigQueryGetDataOperator, self).__init__(*args, **kwargs)
self.dataset_id = dataset_id
self.table_id = table_id
self.max_results = max_results
self.selected_fields = selected_fields
self.bigquery_conn_id = bigquery_conn_id
self.delegate_to = delegate_to

def execute(self, context):
self.log.info('Fetching Data from:')
self.log.info('Dataset: %s ; Table: %s ; Max Results: %s',
self.dataset_id, self.table_id, self.max_results)

hook = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id,
delegate_to=self.delegate_to)

conn = hook.get_conn()
cursor = conn.cursor()

#----------------------- COPY PASTED SECTION: END ----------------

# Trying to add to a MongoDB here itself - coed by GT
from pymongo import MongoClient
header = ['day', 'ticker','app_id','area', 'store_types', 'devices_in_store', 'devices_in_store_or_plot', 'matched_devices',
'all_devices']
client = MongoClient('35.237.46.25:27017')
db = client.test03
collection = db.advan_t1_sample_mongo00

response = cursor.get_tabledata(dataset_id=self.dataset_id, start_index=0,
table_id=self.table_id,
max_results='2',
selected_fields=self.selected_fields)
total_rows=int(response['totalRows'])
chunksize=100000
for chunk in range(0,total_rows,chunksize):
rows=
if chunk+chunksize<total_rows:
self.log.info("Extracting chunk %d to %d"%(chunk,chunk+chunksize))
response = cursor.get_tabledata(dataset_id=self.dataset_id, start_index=chunk,
table_id=self.table_id,
max_results=str(chunksize),
selected_fields=self.selected_fields)

rows = response['rows']

for row in rows:
onerow={}
for i,f in zip(range(len(row['f'])),row['f']):
onerow[header[i]] = f['v']
collection.insert_one(onerow)
self.log.info("------------------------- Document size: %d --------------------"%(collection.find().count()))
else:
self.log.info("Extracting chunk %d to %d"%(chunk,total_rows))
response = cursor.get_tabledata(dataset_id=self.dataset_id, start_index=chunk,
table_id=self.table_id,
max_results=total_rows,
selected_fields=self.selected_fields)

rows = response['rows']

for row in rows:
onerow={}
for i,f in zip(range(len(row['f'])),row['f']):
onerow[header[i]] = f['v']
collection.insert_one(onerow)
self.log.info("------------------------- Document size: %d --------------------"%(collection.find().count()))


self.log.info("Pushed into %s"%collection.name)

if total_rows == collection.find().count():
self.log.info("Successfully pushed %d records into %s"%(total_rows,collection.name))
return(1)
else:
self.log.warning("Push Failed! Total Rows: %d Document Size: %d"%(total_rows,collection.find().count()))
return(0)
# return rows

from airflow import models
from airflow.operators.python_operator import PythonOperator
from airflow.utils import trigger_rule
from airflow.contrib.operators import gcs_to_bq
from airflow.contrib.operators import bigquery_to_gcs
from airflow.contrib.operators import bigquery_operator
from airflow.contrib.operators import bigquery_get_data
from airflow.contrib.operators import MongoHook
def get_dlist(**kwargs):
import logging as log
#Import pymongo
from pymongo import MongoClient
#Pull the data saved in XCom
value = kwargs.get('task_instance').xcom_pull(task_ids='get_data_in_list_from_bq')

header = ['V1','V2']
data=
for rows in value:
onerow={}
for i,f in zip(range(len(rows['f'])),rows['f']):
onerow[header[i]] = f['v']
data.append(onerow)
log.info("Pulled...")
log.info(data)
log.info("Pushing into mongodb...")
client = MongoClient(localhost:27017)
db = client.test
collection = db.testingbq2mongo
collection.insert(data)
log.info("Written to mongoDB...")
client.close()

default_dag_args = {
# Setting start date as yesterday starts the DAG immediately when it is
# detected in the Cloud Storage bucket.
'start_date':yesterday,
# To email on failure or retry set 'email' arg to your email and enable
# emailing here.
'email_on_failure': False,
'email_on_retry': False,
# If a task fails, retry it once after waiting at least 5 minutes
'retries': 0,
#'retry_delay': datetime.timedelta(minutes=5),
'project_id': 'data-rubrics'
}

try:
# [START composer_quickstart_schedule]
with models.DAG(
'composer_testing00',
# Continue to run DAG once per day
schedule_interval=datetime.timedelta(days=1),
default_args=default_dag_args) as dag:
# [END composer_quickstart_schedule]

data_list = bigquery_get_data.BigQueryGetDataOperator(
task_id='get_data_in_list_from_bq',
dataset_id='testcomposer', # Name of the dataset which contains the table ( a BQ terminology)
table_id='summarized_sample_T1' # Name of the BQ table you want to push into MongoDB
)

op_push2mongo = PythonOperator(task_id='Push_to_MongoDB', python_callable=get_dlist, provide_context=True)
data_list >> op_push2mongo
except Exception as e:
raise(e)









share|improve this question





























    0















    I am trying to set up data pipelines that move data from GCS to BigQuery , perform certain tasks/processing and load it to a MongoDB cluster ( all set up in python using DAGs). I have been able to achieve this up until the load to MongoDB. Are there any existing airflow operators that can do this? If not, is it possible to create a custom code using the mongoDB hooks provided in airflow?



    Thanks,
    GT



    EDIT 1



    I used the MongoHook and source code from BigQueryGetDataOperator (Code snippet below). My problem now is that I need to make this work for 10++ million rows and when I increase the max_results='100' default in BigQueryGetDataOperator I get an error :



    sqlalchemy.exc.InvalidRequestError: This Session's transaction has been rolled back due to a previous exception during flush. To begin a new transaction with this Session, first issue Session.rollback(). Original exception was: (_mysql_exceptions.DataError) (1406, "Data too long for column 'value' at row 1")



    I know I should be pushing data into XCom inchunks but am not sure if that can be really done. Is there a standard way to parse large amounts of data in Xcom? Any other alternative of achieving this using Airflow would also be helpfu. The only thing I can think of is writing the data to GCS, loading into MongoDB and then deleting the GCS file.



    #-------- COPY PASTED BigQueryGetDataOperator SECTION: START --------------
    '''Source: https://airflow.readthedocs.io/en/stable/_modules/airflow/contrib/operators/bigquery_get_data.html#BigQueryGetDataOperator '''

    from airflow.contrib.hooks.bigquery_hook import BigQueryHook
    from airflow.models import BaseOperator
    from airflow.utils.decorators import apply_defaults


    class BigQueryGetDataOperator(BaseOperator):
    template_fields = ('dataset_id', 'table_id', 'max_results')
    ui_color = '#e4f0e8'

    @apply_defaults
    def __init__(self,
    dataset_id,
    table_id,
    max_results='100',
    selected_fields=None,
    bigquery_conn_id='bigquery_default',
    delegate_to=None,
    *args,
    **kwargs):
    super(BigQueryGetDataOperator, self).__init__(*args, **kwargs)
    self.dataset_id = dataset_id
    self.table_id = table_id
    self.max_results = max_results
    self.selected_fields = selected_fields
    self.bigquery_conn_id = bigquery_conn_id
    self.delegate_to = delegate_to

    def execute(self, context):
    self.log.info('Fetching Data from:')
    self.log.info('Dataset: %s ; Table: %s ; Max Results: %s',
    self.dataset_id, self.table_id, self.max_results)

    hook = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id,
    delegate_to=self.delegate_to)

    conn = hook.get_conn()
    cursor = conn.cursor()
    response = cursor.get_tabledata(dataset_id=self.dataset_id,
    table_id=self.table_id,
    max_results=self.max_results,
    selected_fields=self.selected_fields)

    self.log.info('Total Extracted rows: %s', response['totalRows'])
    rows = response['rows']
    return rows
    # Below lines were commented as I did not want a list but a json
    # table_data =
    # for dict_row in rows:
    # single_row =
    # for fields in dict_row['f']:
    # single_row.append(fields['v'])
    # table_data.append(single_row)

    # return table_data
    #----------------------- COPY PASTED SECTION: END ----------------

    from airflow import models
    from airflow.operators.python_operator import PythonOperator
    from airflow.utils import trigger_rule
    from airflow.contrib.operators import gcs_to_bq
    from airflow.contrib.operators import bigquery_to_gcs
    from airflow.contrib.operators import bigquery_operator
    from airflow.contrib.operators import bigquery_get_data
    from airflow.contrib.operators import MongoHook
    def get_dlist(**kwargs):
    import logging as log
    #Import pymongo
    from pymongo import MongoClient
    #Pull the data saved in XCom
    value = kwargs.get('task_instance').xcom_pull(task_ids='get_data_in_list_from_bq')

    header = ['V1','V2']
    data=
    for rows in value:
    onerow={}
    for i,f in zip(range(len(rows['f'])),rows['f']):
    onerow[header[i]] = f['v']
    data.append(onerow)
    log.info("Pulled...")
    log.info(data)
    log.info("Pushing into mongodb...")
    client = MongoClient(localhost:27017)
    db = client.test
    collection = db.testingbq2mongo
    collection.insert(data)
    log.info("Written to mongoDB...")
    client.close()

    default_dag_args = {
    # Setting start date as yesterday starts the DAG immediately when it is
    # detected in the Cloud Storage bucket.
    'start_date':yesterday,
    # To email on failure or retry set 'email' arg to your email and enable
    # emailing here.
    'email_on_failure': False,
    'email_on_retry': False,
    # If a task fails, retry it once after waiting at least 5 minutes
    'retries': 0,
    #'retry_delay': datetime.timedelta(minutes=5),
    'project_id': 'data-rubrics'
    }

    try:
    # [START composer_quickstart_schedule]
    with models.DAG(
    'composer_testing00',
    # Continue to run DAG once per day
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args) as dag:
    # [END composer_quickstart_schedule]

    data_list = bigquery_get_data.BigQueryGetDataOperator(
    task_id='get_data_in_list_from_bq',
    dataset_id='testcomposer', # Name of the dataset which contains the table ( a BQ terminology)
    table_id='summarized_sample_T1' # Name of the BQ table you want to push into MongoDB
    )

    op_push2mongo = PythonOperator(task_id='Push_to_MongoDB', python_callable=get_dlist, provide_context=True)
    data_list >> op_push2mongo
    except Exception as e:
    raise(e)


    EDIT 2



        #-------- COPY PASTED BigQueryGetDataOperator SECTION: START --------------
    '''Source: https://airflow.readthedocs.io/en/stable/_modules/airflow/contrib/operators/bigquery_get_data.html#BigQueryGetDataOperator '''

    from airflow.contrib.hooks.bigquery_hook import BigQueryHook
    from airflow.models import BaseOperator
    from airflow.utils.decorators import apply_defaults


    class BigQueryGetDataOperator(BaseOperator):
    template_fields = ('dataset_id', 'table_id', 'max_results')
    ui_color = '#e4f0e8'

    @apply_defaults
    def __init__(self,
    dataset_id,
    table_id,
    max_results='100',
    selected_fields=None,
    bigquery_conn_id='bigquery_default',
    delegate_to=None,
    *args,
    **kwargs):
    super(BigQueryGetDataOperator, self).__init__(*args, **kwargs)
    self.dataset_id = dataset_id
    self.table_id = table_id
    self.max_results = max_results
    self.selected_fields = selected_fields
    self.bigquery_conn_id = bigquery_conn_id
    self.delegate_to = delegate_to

    def execute(self, context):
    self.log.info('Fetching Data from:')
    self.log.info('Dataset: %s ; Table: %s ; Max Results: %s',
    self.dataset_id, self.table_id, self.max_results)

    hook = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id,
    delegate_to=self.delegate_to)

    conn = hook.get_conn()
    cursor = conn.cursor()

    #----------------------- COPY PASTED SECTION: END ----------------

    # Trying to add to a MongoDB here itself - coed by GT
    from pymongo import MongoClient
    header = ['day', 'ticker','app_id','area', 'store_types', 'devices_in_store', 'devices_in_store_or_plot', 'matched_devices',
    'all_devices']
    client = MongoClient('35.237.46.25:27017')
    db = client.test03
    collection = db.advan_t1_sample_mongo00

    response = cursor.get_tabledata(dataset_id=self.dataset_id, start_index=0,
    table_id=self.table_id,
    max_results='2',
    selected_fields=self.selected_fields)
    total_rows=int(response['totalRows'])
    chunksize=100000
    for chunk in range(0,total_rows,chunksize):
    rows=
    if chunk+chunksize<total_rows:
    self.log.info("Extracting chunk %d to %d"%(chunk,chunk+chunksize))
    response = cursor.get_tabledata(dataset_id=self.dataset_id, start_index=chunk,
    table_id=self.table_id,
    max_results=str(chunksize),
    selected_fields=self.selected_fields)

    rows = response['rows']

    for row in rows:
    onerow={}
    for i,f in zip(range(len(row['f'])),row['f']):
    onerow[header[i]] = f['v']
    collection.insert_one(onerow)
    self.log.info("------------------------- Document size: %d --------------------"%(collection.find().count()))
    else:
    self.log.info("Extracting chunk %d to %d"%(chunk,total_rows))
    response = cursor.get_tabledata(dataset_id=self.dataset_id, start_index=chunk,
    table_id=self.table_id,
    max_results=total_rows,
    selected_fields=self.selected_fields)

    rows = response['rows']

    for row in rows:
    onerow={}
    for i,f in zip(range(len(row['f'])),row['f']):
    onerow[header[i]] = f['v']
    collection.insert_one(onerow)
    self.log.info("------------------------- Document size: %d --------------------"%(collection.find().count()))


    self.log.info("Pushed into %s"%collection.name)

    if total_rows == collection.find().count():
    self.log.info("Successfully pushed %d records into %s"%(total_rows,collection.name))
    return(1)
    else:
    self.log.warning("Push Failed! Total Rows: %d Document Size: %d"%(total_rows,collection.find().count()))
    return(0)
    # return rows

    from airflow import models
    from airflow.operators.python_operator import PythonOperator
    from airflow.utils import trigger_rule
    from airflow.contrib.operators import gcs_to_bq
    from airflow.contrib.operators import bigquery_to_gcs
    from airflow.contrib.operators import bigquery_operator
    from airflow.contrib.operators import bigquery_get_data
    from airflow.contrib.operators import MongoHook
    def get_dlist(**kwargs):
    import logging as log
    #Import pymongo
    from pymongo import MongoClient
    #Pull the data saved in XCom
    value = kwargs.get('task_instance').xcom_pull(task_ids='get_data_in_list_from_bq')

    header = ['V1','V2']
    data=
    for rows in value:
    onerow={}
    for i,f in zip(range(len(rows['f'])),rows['f']):
    onerow[header[i]] = f['v']
    data.append(onerow)
    log.info("Pulled...")
    log.info(data)
    log.info("Pushing into mongodb...")
    client = MongoClient(localhost:27017)
    db = client.test
    collection = db.testingbq2mongo
    collection.insert(data)
    log.info("Written to mongoDB...")
    client.close()

    default_dag_args = {
    # Setting start date as yesterday starts the DAG immediately when it is
    # detected in the Cloud Storage bucket.
    'start_date':yesterday,
    # To email on failure or retry set 'email' arg to your email and enable
    # emailing here.
    'email_on_failure': False,
    'email_on_retry': False,
    # If a task fails, retry it once after waiting at least 5 minutes
    'retries': 0,
    #'retry_delay': datetime.timedelta(minutes=5),
    'project_id': 'data-rubrics'
    }

    try:
    # [START composer_quickstart_schedule]
    with models.DAG(
    'composer_testing00',
    # Continue to run DAG once per day
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args) as dag:
    # [END composer_quickstart_schedule]

    data_list = bigquery_get_data.BigQueryGetDataOperator(
    task_id='get_data_in_list_from_bq',
    dataset_id='testcomposer', # Name of the dataset which contains the table ( a BQ terminology)
    table_id='summarized_sample_T1' # Name of the BQ table you want to push into MongoDB
    )

    op_push2mongo = PythonOperator(task_id='Push_to_MongoDB', python_callable=get_dlist, provide_context=True)
    data_list >> op_push2mongo
    except Exception as e:
    raise(e)









    share|improve this question



























      0












      0








      0








      I am trying to set up data pipelines that move data from GCS to BigQuery , perform certain tasks/processing and load it to a MongoDB cluster ( all set up in python using DAGs). I have been able to achieve this up until the load to MongoDB. Are there any existing airflow operators that can do this? If not, is it possible to create a custom code using the mongoDB hooks provided in airflow?



      Thanks,
      GT



      EDIT 1



      I used the MongoHook and source code from BigQueryGetDataOperator (Code snippet below). My problem now is that I need to make this work for 10++ million rows and when I increase the max_results='100' default in BigQueryGetDataOperator I get an error :



      sqlalchemy.exc.InvalidRequestError: This Session's transaction has been rolled back due to a previous exception during flush. To begin a new transaction with this Session, first issue Session.rollback(). Original exception was: (_mysql_exceptions.DataError) (1406, "Data too long for column 'value' at row 1")



      I know I should be pushing data into XCom inchunks but am not sure if that can be really done. Is there a standard way to parse large amounts of data in Xcom? Any other alternative of achieving this using Airflow would also be helpfu. The only thing I can think of is writing the data to GCS, loading into MongoDB and then deleting the GCS file.



      #-------- COPY PASTED BigQueryGetDataOperator SECTION: START --------------
      '''Source: https://airflow.readthedocs.io/en/stable/_modules/airflow/contrib/operators/bigquery_get_data.html#BigQueryGetDataOperator '''

      from airflow.contrib.hooks.bigquery_hook import BigQueryHook
      from airflow.models import BaseOperator
      from airflow.utils.decorators import apply_defaults


      class BigQueryGetDataOperator(BaseOperator):
      template_fields = ('dataset_id', 'table_id', 'max_results')
      ui_color = '#e4f0e8'

      @apply_defaults
      def __init__(self,
      dataset_id,
      table_id,
      max_results='100',
      selected_fields=None,
      bigquery_conn_id='bigquery_default',
      delegate_to=None,
      *args,
      **kwargs):
      super(BigQueryGetDataOperator, self).__init__(*args, **kwargs)
      self.dataset_id = dataset_id
      self.table_id = table_id
      self.max_results = max_results
      self.selected_fields = selected_fields
      self.bigquery_conn_id = bigquery_conn_id
      self.delegate_to = delegate_to

      def execute(self, context):
      self.log.info('Fetching Data from:')
      self.log.info('Dataset: %s ; Table: %s ; Max Results: %s',
      self.dataset_id, self.table_id, self.max_results)

      hook = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id,
      delegate_to=self.delegate_to)

      conn = hook.get_conn()
      cursor = conn.cursor()
      response = cursor.get_tabledata(dataset_id=self.dataset_id,
      table_id=self.table_id,
      max_results=self.max_results,
      selected_fields=self.selected_fields)

      self.log.info('Total Extracted rows: %s', response['totalRows'])
      rows = response['rows']
      return rows
      # Below lines were commented as I did not want a list but a json
      # table_data =
      # for dict_row in rows:
      # single_row =
      # for fields in dict_row['f']:
      # single_row.append(fields['v'])
      # table_data.append(single_row)

      # return table_data
      #----------------------- COPY PASTED SECTION: END ----------------

      from airflow import models
      from airflow.operators.python_operator import PythonOperator
      from airflow.utils import trigger_rule
      from airflow.contrib.operators import gcs_to_bq
      from airflow.contrib.operators import bigquery_to_gcs
      from airflow.contrib.operators import bigquery_operator
      from airflow.contrib.operators import bigquery_get_data
      from airflow.contrib.operators import MongoHook
      def get_dlist(**kwargs):
      import logging as log
      #Import pymongo
      from pymongo import MongoClient
      #Pull the data saved in XCom
      value = kwargs.get('task_instance').xcom_pull(task_ids='get_data_in_list_from_bq')

      header = ['V1','V2']
      data=
      for rows in value:
      onerow={}
      for i,f in zip(range(len(rows['f'])),rows['f']):
      onerow[header[i]] = f['v']
      data.append(onerow)
      log.info("Pulled...")
      log.info(data)
      log.info("Pushing into mongodb...")
      client = MongoClient(localhost:27017)
      db = client.test
      collection = db.testingbq2mongo
      collection.insert(data)
      log.info("Written to mongoDB...")
      client.close()

      default_dag_args = {
      # Setting start date as yesterday starts the DAG immediately when it is
      # detected in the Cloud Storage bucket.
      'start_date':yesterday,
      # To email on failure or retry set 'email' arg to your email and enable
      # emailing here.
      'email_on_failure': False,
      'email_on_retry': False,
      # If a task fails, retry it once after waiting at least 5 minutes
      'retries': 0,
      #'retry_delay': datetime.timedelta(minutes=5),
      'project_id': 'data-rubrics'
      }

      try:
      # [START composer_quickstart_schedule]
      with models.DAG(
      'composer_testing00',
      # Continue to run DAG once per day
      schedule_interval=datetime.timedelta(days=1),
      default_args=default_dag_args) as dag:
      # [END composer_quickstart_schedule]

      data_list = bigquery_get_data.BigQueryGetDataOperator(
      task_id='get_data_in_list_from_bq',
      dataset_id='testcomposer', # Name of the dataset which contains the table ( a BQ terminology)
      table_id='summarized_sample_T1' # Name of the BQ table you want to push into MongoDB
      )

      op_push2mongo = PythonOperator(task_id='Push_to_MongoDB', python_callable=get_dlist, provide_context=True)
      data_list >> op_push2mongo
      except Exception as e:
      raise(e)


      EDIT 2



          #-------- COPY PASTED BigQueryGetDataOperator SECTION: START --------------
      '''Source: https://airflow.readthedocs.io/en/stable/_modules/airflow/contrib/operators/bigquery_get_data.html#BigQueryGetDataOperator '''

      from airflow.contrib.hooks.bigquery_hook import BigQueryHook
      from airflow.models import BaseOperator
      from airflow.utils.decorators import apply_defaults


      class BigQueryGetDataOperator(BaseOperator):
      template_fields = ('dataset_id', 'table_id', 'max_results')
      ui_color = '#e4f0e8'

      @apply_defaults
      def __init__(self,
      dataset_id,
      table_id,
      max_results='100',
      selected_fields=None,
      bigquery_conn_id='bigquery_default',
      delegate_to=None,
      *args,
      **kwargs):
      super(BigQueryGetDataOperator, self).__init__(*args, **kwargs)
      self.dataset_id = dataset_id
      self.table_id = table_id
      self.max_results = max_results
      self.selected_fields = selected_fields
      self.bigquery_conn_id = bigquery_conn_id
      self.delegate_to = delegate_to

      def execute(self, context):
      self.log.info('Fetching Data from:')
      self.log.info('Dataset: %s ; Table: %s ; Max Results: %s',
      self.dataset_id, self.table_id, self.max_results)

      hook = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id,
      delegate_to=self.delegate_to)

      conn = hook.get_conn()
      cursor = conn.cursor()

      #----------------------- COPY PASTED SECTION: END ----------------

      # Trying to add to a MongoDB here itself - coed by GT
      from pymongo import MongoClient
      header = ['day', 'ticker','app_id','area', 'store_types', 'devices_in_store', 'devices_in_store_or_plot', 'matched_devices',
      'all_devices']
      client = MongoClient('35.237.46.25:27017')
      db = client.test03
      collection = db.advan_t1_sample_mongo00

      response = cursor.get_tabledata(dataset_id=self.dataset_id, start_index=0,
      table_id=self.table_id,
      max_results='2',
      selected_fields=self.selected_fields)
      total_rows=int(response['totalRows'])
      chunksize=100000
      for chunk in range(0,total_rows,chunksize):
      rows=
      if chunk+chunksize<total_rows:
      self.log.info("Extracting chunk %d to %d"%(chunk,chunk+chunksize))
      response = cursor.get_tabledata(dataset_id=self.dataset_id, start_index=chunk,
      table_id=self.table_id,
      max_results=str(chunksize),
      selected_fields=self.selected_fields)

      rows = response['rows']

      for row in rows:
      onerow={}
      for i,f in zip(range(len(row['f'])),row['f']):
      onerow[header[i]] = f['v']
      collection.insert_one(onerow)
      self.log.info("------------------------- Document size: %d --------------------"%(collection.find().count()))
      else:
      self.log.info("Extracting chunk %d to %d"%(chunk,total_rows))
      response = cursor.get_tabledata(dataset_id=self.dataset_id, start_index=chunk,
      table_id=self.table_id,
      max_results=total_rows,
      selected_fields=self.selected_fields)

      rows = response['rows']

      for row in rows:
      onerow={}
      for i,f in zip(range(len(row['f'])),row['f']):
      onerow[header[i]] = f['v']
      collection.insert_one(onerow)
      self.log.info("------------------------- Document size: %d --------------------"%(collection.find().count()))


      self.log.info("Pushed into %s"%collection.name)

      if total_rows == collection.find().count():
      self.log.info("Successfully pushed %d records into %s"%(total_rows,collection.name))
      return(1)
      else:
      self.log.warning("Push Failed! Total Rows: %d Document Size: %d"%(total_rows,collection.find().count()))
      return(0)
      # return rows

      from airflow import models
      from airflow.operators.python_operator import PythonOperator
      from airflow.utils import trigger_rule
      from airflow.contrib.operators import gcs_to_bq
      from airflow.contrib.operators import bigquery_to_gcs
      from airflow.contrib.operators import bigquery_operator
      from airflow.contrib.operators import bigquery_get_data
      from airflow.contrib.operators import MongoHook
      def get_dlist(**kwargs):
      import logging as log
      #Import pymongo
      from pymongo import MongoClient
      #Pull the data saved in XCom
      value = kwargs.get('task_instance').xcom_pull(task_ids='get_data_in_list_from_bq')

      header = ['V1','V2']
      data=
      for rows in value:
      onerow={}
      for i,f in zip(range(len(rows['f'])),rows['f']):
      onerow[header[i]] = f['v']
      data.append(onerow)
      log.info("Pulled...")
      log.info(data)
      log.info("Pushing into mongodb...")
      client = MongoClient(localhost:27017)
      db = client.test
      collection = db.testingbq2mongo
      collection.insert(data)
      log.info("Written to mongoDB...")
      client.close()

      default_dag_args = {
      # Setting start date as yesterday starts the DAG immediately when it is
      # detected in the Cloud Storage bucket.
      'start_date':yesterday,
      # To email on failure or retry set 'email' arg to your email and enable
      # emailing here.
      'email_on_failure': False,
      'email_on_retry': False,
      # If a task fails, retry it once after waiting at least 5 minutes
      'retries': 0,
      #'retry_delay': datetime.timedelta(minutes=5),
      'project_id': 'data-rubrics'
      }

      try:
      # [START composer_quickstart_schedule]
      with models.DAG(
      'composer_testing00',
      # Continue to run DAG once per day
      schedule_interval=datetime.timedelta(days=1),
      default_args=default_dag_args) as dag:
      # [END composer_quickstart_schedule]

      data_list = bigquery_get_data.BigQueryGetDataOperator(
      task_id='get_data_in_list_from_bq',
      dataset_id='testcomposer', # Name of the dataset which contains the table ( a BQ terminology)
      table_id='summarized_sample_T1' # Name of the BQ table you want to push into MongoDB
      )

      op_push2mongo = PythonOperator(task_id='Push_to_MongoDB', python_callable=get_dlist, provide_context=True)
      data_list >> op_push2mongo
      except Exception as e:
      raise(e)









      share|improve this question
















      I am trying to set up data pipelines that move data from GCS to BigQuery , perform certain tasks/processing and load it to a MongoDB cluster ( all set up in python using DAGs). I have been able to achieve this up until the load to MongoDB. Are there any existing airflow operators that can do this? If not, is it possible to create a custom code using the mongoDB hooks provided in airflow?



      Thanks,
      GT



      EDIT 1



      I used the MongoHook and source code from BigQueryGetDataOperator (Code snippet below). My problem now is that I need to make this work for 10++ million rows and when I increase the max_results='100' default in BigQueryGetDataOperator I get an error :



      sqlalchemy.exc.InvalidRequestError: This Session's transaction has been rolled back due to a previous exception during flush. To begin a new transaction with this Session, first issue Session.rollback(). Original exception was: (_mysql_exceptions.DataError) (1406, "Data too long for column 'value' at row 1")



      I know I should be pushing data into XCom inchunks but am not sure if that can be really done. Is there a standard way to parse large amounts of data in Xcom? Any other alternative of achieving this using Airflow would also be helpfu. The only thing I can think of is writing the data to GCS, loading into MongoDB and then deleting the GCS file.



      #-------- COPY PASTED BigQueryGetDataOperator SECTION: START --------------
      '''Source: https://airflow.readthedocs.io/en/stable/_modules/airflow/contrib/operators/bigquery_get_data.html#BigQueryGetDataOperator '''

      from airflow.contrib.hooks.bigquery_hook import BigQueryHook
      from airflow.models import BaseOperator
      from airflow.utils.decorators import apply_defaults


      class BigQueryGetDataOperator(BaseOperator):
      template_fields = ('dataset_id', 'table_id', 'max_results')
      ui_color = '#e4f0e8'

      @apply_defaults
      def __init__(self,
      dataset_id,
      table_id,
      max_results='100',
      selected_fields=None,
      bigquery_conn_id='bigquery_default',
      delegate_to=None,
      *args,
      **kwargs):
      super(BigQueryGetDataOperator, self).__init__(*args, **kwargs)
      self.dataset_id = dataset_id
      self.table_id = table_id
      self.max_results = max_results
      self.selected_fields = selected_fields
      self.bigquery_conn_id = bigquery_conn_id
      self.delegate_to = delegate_to

      def execute(self, context):
      self.log.info('Fetching Data from:')
      self.log.info('Dataset: %s ; Table: %s ; Max Results: %s',
      self.dataset_id, self.table_id, self.max_results)

      hook = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id,
      delegate_to=self.delegate_to)

      conn = hook.get_conn()
      cursor = conn.cursor()
      response = cursor.get_tabledata(dataset_id=self.dataset_id,
      table_id=self.table_id,
      max_results=self.max_results,
      selected_fields=self.selected_fields)

      self.log.info('Total Extracted rows: %s', response['totalRows'])
      rows = response['rows']
      return rows
      # Below lines were commented as I did not want a list but a json
      # table_data =
      # for dict_row in rows:
      # single_row =
      # for fields in dict_row['f']:
      # single_row.append(fields['v'])
      # table_data.append(single_row)

      # return table_data
      #----------------------- COPY PASTED SECTION: END ----------------

      from airflow import models
      from airflow.operators.python_operator import PythonOperator
      from airflow.utils import trigger_rule
      from airflow.contrib.operators import gcs_to_bq
      from airflow.contrib.operators import bigquery_to_gcs
      from airflow.contrib.operators import bigquery_operator
      from airflow.contrib.operators import bigquery_get_data
      from airflow.contrib.operators import MongoHook
      def get_dlist(**kwargs):
      import logging as log
      #Import pymongo
      from pymongo import MongoClient
      #Pull the data saved in XCom
      value = kwargs.get('task_instance').xcom_pull(task_ids='get_data_in_list_from_bq')

      header = ['V1','V2']
      data=
      for rows in value:
      onerow={}
      for i,f in zip(range(len(rows['f'])),rows['f']):
      onerow[header[i]] = f['v']
      data.append(onerow)
      log.info("Pulled...")
      log.info(data)
      log.info("Pushing into mongodb...")
      client = MongoClient(localhost:27017)
      db = client.test
      collection = db.testingbq2mongo
      collection.insert(data)
      log.info("Written to mongoDB...")
      client.close()

      default_dag_args = {
      # Setting start date as yesterday starts the DAG immediately when it is
      # detected in the Cloud Storage bucket.
      'start_date':yesterday,
      # To email on failure or retry set 'email' arg to your email and enable
      # emailing here.
      'email_on_failure': False,
      'email_on_retry': False,
      # If a task fails, retry it once after waiting at least 5 minutes
      'retries': 0,
      #'retry_delay': datetime.timedelta(minutes=5),
      'project_id': 'data-rubrics'
      }

      try:
      # [START composer_quickstart_schedule]
      with models.DAG(
      'composer_testing00',
      # Continue to run DAG once per day
      schedule_interval=datetime.timedelta(days=1),
      default_args=default_dag_args) as dag:
      # [END composer_quickstart_schedule]

      data_list = bigquery_get_data.BigQueryGetDataOperator(
      task_id='get_data_in_list_from_bq',
      dataset_id='testcomposer', # Name of the dataset which contains the table ( a BQ terminology)
      table_id='summarized_sample_T1' # Name of the BQ table you want to push into MongoDB
      )

      op_push2mongo = PythonOperator(task_id='Push_to_MongoDB', python_callable=get_dlist, provide_context=True)
      data_list >> op_push2mongo
      except Exception as e:
      raise(e)


      EDIT 2



          #-------- COPY PASTED BigQueryGetDataOperator SECTION: START --------------
      '''Source: https://airflow.readthedocs.io/en/stable/_modules/airflow/contrib/operators/bigquery_get_data.html#BigQueryGetDataOperator '''

      from airflow.contrib.hooks.bigquery_hook import BigQueryHook
      from airflow.models import BaseOperator
      from airflow.utils.decorators import apply_defaults


      class BigQueryGetDataOperator(BaseOperator):
      template_fields = ('dataset_id', 'table_id', 'max_results')
      ui_color = '#e4f0e8'

      @apply_defaults
      def __init__(self,
      dataset_id,
      table_id,
      max_results='100',
      selected_fields=None,
      bigquery_conn_id='bigquery_default',
      delegate_to=None,
      *args,
      **kwargs):
      super(BigQueryGetDataOperator, self).__init__(*args, **kwargs)
      self.dataset_id = dataset_id
      self.table_id = table_id
      self.max_results = max_results
      self.selected_fields = selected_fields
      self.bigquery_conn_id = bigquery_conn_id
      self.delegate_to = delegate_to

      def execute(self, context):
      self.log.info('Fetching Data from:')
      self.log.info('Dataset: %s ; Table: %s ; Max Results: %s',
      self.dataset_id, self.table_id, self.max_results)

      hook = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id,
      delegate_to=self.delegate_to)

      conn = hook.get_conn()
      cursor = conn.cursor()

      #----------------------- COPY PASTED SECTION: END ----------------

      # Trying to add to a MongoDB here itself - coed by GT
      from pymongo import MongoClient
      header = ['day', 'ticker','app_id','area', 'store_types', 'devices_in_store', 'devices_in_store_or_plot', 'matched_devices',
      'all_devices']
      client = MongoClient('35.237.46.25:27017')
      db = client.test03
      collection = db.advan_t1_sample_mongo00

      response = cursor.get_tabledata(dataset_id=self.dataset_id, start_index=0,
      table_id=self.table_id,
      max_results='2',
      selected_fields=self.selected_fields)
      total_rows=int(response['totalRows'])
      chunksize=100000
      for chunk in range(0,total_rows,chunksize):
      rows=
      if chunk+chunksize<total_rows:
      self.log.info("Extracting chunk %d to %d"%(chunk,chunk+chunksize))
      response = cursor.get_tabledata(dataset_id=self.dataset_id, start_index=chunk,
      table_id=self.table_id,
      max_results=str(chunksize),
      selected_fields=self.selected_fields)

      rows = response['rows']

      for row in rows:
      onerow={}
      for i,f in zip(range(len(row['f'])),row['f']):
      onerow[header[i]] = f['v']
      collection.insert_one(onerow)
      self.log.info("------------------------- Document size: %d --------------------"%(collection.find().count()))
      else:
      self.log.info("Extracting chunk %d to %d"%(chunk,total_rows))
      response = cursor.get_tabledata(dataset_id=self.dataset_id, start_index=chunk,
      table_id=self.table_id,
      max_results=total_rows,
      selected_fields=self.selected_fields)

      rows = response['rows']

      for row in rows:
      onerow={}
      for i,f in zip(range(len(row['f'])),row['f']):
      onerow[header[i]] = f['v']
      collection.insert_one(onerow)
      self.log.info("------------------------- Document size: %d --------------------"%(collection.find().count()))


      self.log.info("Pushed into %s"%collection.name)

      if total_rows == collection.find().count():
      self.log.info("Successfully pushed %d records into %s"%(total_rows,collection.name))
      return(1)
      else:
      self.log.warning("Push Failed! Total Rows: %d Document Size: %d"%(total_rows,collection.find().count()))
      return(0)
      # return rows

      from airflow import models
      from airflow.operators.python_operator import PythonOperator
      from airflow.utils import trigger_rule
      from airflow.contrib.operators import gcs_to_bq
      from airflow.contrib.operators import bigquery_to_gcs
      from airflow.contrib.operators import bigquery_operator
      from airflow.contrib.operators import bigquery_get_data
      from airflow.contrib.operators import MongoHook
      def get_dlist(**kwargs):
      import logging as log
      #Import pymongo
      from pymongo import MongoClient
      #Pull the data saved in XCom
      value = kwargs.get('task_instance').xcom_pull(task_ids='get_data_in_list_from_bq')

      header = ['V1','V2']
      data=
      for rows in value:
      onerow={}
      for i,f in zip(range(len(rows['f'])),rows['f']):
      onerow[header[i]] = f['v']
      data.append(onerow)
      log.info("Pulled...")
      log.info(data)
      log.info("Pushing into mongodb...")
      client = MongoClient(localhost:27017)
      db = client.test
      collection = db.testingbq2mongo
      collection.insert(data)
      log.info("Written to mongoDB...")
      client.close()

      default_dag_args = {
      # Setting start date as yesterday starts the DAG immediately when it is
      # detected in the Cloud Storage bucket.
      'start_date':yesterday,
      # To email on failure or retry set 'email' arg to your email and enable
      # emailing here.
      'email_on_failure': False,
      'email_on_retry': False,
      # If a task fails, retry it once after waiting at least 5 minutes
      'retries': 0,
      #'retry_delay': datetime.timedelta(minutes=5),
      'project_id': 'data-rubrics'
      }

      try:
      # [START composer_quickstart_schedule]
      with models.DAG(
      'composer_testing00',
      # Continue to run DAG once per day
      schedule_interval=datetime.timedelta(days=1),
      default_args=default_dag_args) as dag:
      # [END composer_quickstart_schedule]

      data_list = bigquery_get_data.BigQueryGetDataOperator(
      task_id='get_data_in_list_from_bq',
      dataset_id='testcomposer', # Name of the dataset which contains the table ( a BQ terminology)
      table_id='summarized_sample_T1' # Name of the BQ table you want to push into MongoDB
      )

      op_push2mongo = PythonOperator(task_id='Push_to_MongoDB', python_callable=get_dlist, provide_context=True)
      data_list >> op_push2mongo
      except Exception as e:
      raise(e)






      python mongodb google-bigquery airflow google-cloud-composer






      share|improve this question















      share|improve this question













      share|improve this question




      share|improve this question








      edited Dec 11 '18 at 9:50







      Gaurav Taneja

















      asked Nov 20 '18 at 13:14









      Gaurav TanejaGaurav Taneja

      764314




      764314
























          2 Answers
          2






          active

          oldest

          votes


















          1














          The easiest/quickest way would be to use a PythonOperator and access the necessary hook object directly.



          If you need to do this often I would recommend packaging the code up as a custom operator.






          share|improve this answer
























          • And if possible to PR this operator to Airflow repository so everyone can use it.

            – Luis
            Dec 9 '18 at 15:39



















          1














          It's considered an antipattern to pass large amounts of data in XCOMs. I would recommend writing data from BigQuery to a durable storage service like Cloud Storage then loading into MongoDB from there.






          share|improve this answer























            Your Answer






            StackExchange.ifUsing("editor", function () {
            StackExchange.using("externalEditor", function () {
            StackExchange.using("snippets", function () {
            StackExchange.snippets.init();
            });
            });
            }, "code-snippets");

            StackExchange.ready(function() {
            var channelOptions = {
            tags: "".split(" "),
            id: "1"
            };
            initTagRenderer("".split(" "), "".split(" "), channelOptions);

            StackExchange.using("externalEditor", function() {
            // Have to fire editor after snippets, if snippets enabled
            if (StackExchange.settings.snippets.snippetsEnabled) {
            StackExchange.using("snippets", function() {
            createEditor();
            });
            }
            else {
            createEditor();
            }
            });

            function createEditor() {
            StackExchange.prepareEditor({
            heartbeatType: 'answer',
            autoActivateHeartbeat: false,
            convertImagesToLinks: true,
            noModals: true,
            showLowRepImageUploadWarning: true,
            reputationToPostImages: 10,
            bindNavPrevention: true,
            postfix: "",
            imageUploader: {
            brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
            contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
            allowUrls: true
            },
            onDemand: true,
            discardSelector: ".discard-answer"
            ,immediatelyShowMarkdownHelp:true
            });


            }
            });














            draft saved

            draft discarded


















            StackExchange.ready(
            function () {
            StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53393836%2fscheduling-load-from-bigquery-to-mongodb-using-apache-airflow-cloud-composer%23new-answer', 'question_page');
            }
            );

            Post as a guest















            Required, but never shown

























            2 Answers
            2






            active

            oldest

            votes








            2 Answers
            2






            active

            oldest

            votes









            active

            oldest

            votes






            active

            oldest

            votes









            1














            The easiest/quickest way would be to use a PythonOperator and access the necessary hook object directly.



            If you need to do this often I would recommend packaging the code up as a custom operator.






            share|improve this answer
























            • And if possible to PR this operator to Airflow repository so everyone can use it.

              – Luis
              Dec 9 '18 at 15:39
















            1














            The easiest/quickest way would be to use a PythonOperator and access the necessary hook object directly.



            If you need to do this often I would recommend packaging the code up as a custom operator.






            share|improve this answer
























            • And if possible to PR this operator to Airflow repository so everyone can use it.

              – Luis
              Dec 9 '18 at 15:39














            1












            1








            1







            The easiest/quickest way would be to use a PythonOperator and access the necessary hook object directly.



            If you need to do this often I would recommend packaging the code up as a custom operator.






            share|improve this answer













            The easiest/quickest way would be to use a PythonOperator and access the necessary hook object directly.



            If you need to do this often I would recommend packaging the code up as a custom operator.







            share|improve this answer












            share|improve this answer



            share|improve this answer










            answered Nov 21 '18 at 1:43









            joebjoeb

            2,20611519




            2,20611519













            • And if possible to PR this operator to Airflow repository so everyone can use it.

              – Luis
              Dec 9 '18 at 15:39



















            • And if possible to PR this operator to Airflow repository so everyone can use it.

              – Luis
              Dec 9 '18 at 15:39

















            And if possible to PR this operator to Airflow repository so everyone can use it.

            – Luis
            Dec 9 '18 at 15:39





            And if possible to PR this operator to Airflow repository so everyone can use it.

            – Luis
            Dec 9 '18 at 15:39













            1














            It's considered an antipattern to pass large amounts of data in XCOMs. I would recommend writing data from BigQuery to a durable storage service like Cloud Storage then loading into MongoDB from there.






            share|improve this answer




























              1














              It's considered an antipattern to pass large amounts of data in XCOMs. I would recommend writing data from BigQuery to a durable storage service like Cloud Storage then loading into MongoDB from there.






              share|improve this answer


























                1












                1








                1







                It's considered an antipattern to pass large amounts of data in XCOMs. I would recommend writing data from BigQuery to a durable storage service like Cloud Storage then loading into MongoDB from there.






                share|improve this answer













                It's considered an antipattern to pass large amounts of data in XCOMs. I would recommend writing data from BigQuery to a durable storage service like Cloud Storage then loading into MongoDB from there.







                share|improve this answer












                share|improve this answer



                share|improve this answer










                answered Dec 10 '18 at 23:50









                WilsonWilson

                29514




                29514






























                    draft saved

                    draft discarded




















































                    Thanks for contributing an answer to Stack Overflow!


                    • Please be sure to answer the question. Provide details and share your research!

                    But avoid



                    • Asking for help, clarification, or responding to other answers.

                    • Making statements based on opinion; back them up with references or personal experience.


                    To learn more, see our tips on writing great answers.




                    draft saved


                    draft discarded














                    StackExchange.ready(
                    function () {
                    StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53393836%2fscheduling-load-from-bigquery-to-mongodb-using-apache-airflow-cloud-composer%23new-answer', 'question_page');
                    }
                    );

                    Post as a guest















                    Required, but never shown





















































                    Required, but never shown














                    Required, but never shown












                    Required, but never shown







                    Required, but never shown

































                    Required, but never shown














                    Required, but never shown












                    Required, but never shown







                    Required, but never shown







                    Popular posts from this blog

                    Guess what letter conforming each word

                    Port of Spain

                    Run scheduled task as local user group (not BUILTIN)