Scheduling load from BigQuery to MongoDB using apache-airflow ( cloud-composer)
I am trying to set up data pipelines that move data from GCS to BigQuery , perform certain tasks/processing and load it to a MongoDB cluster ( all set up in python using DAGs). I have been able to achieve this up until the load to MongoDB. Are there any existing airflow operators that can do this? If not, is it possible to create a custom code using the mongoDB hooks provided in airflow?
Thanks,
GT
EDIT 1
I used the MongoHook
and source code from BigQueryGetDataOperator
(Code snippet below). My problem now is that I need to make this work for 10++ million rows and when I increase the max_results='100'
default in BigQueryGetDataOperator
I get an error :
sqlalchemy.exc.InvalidRequestError: This Session's transaction has been rolled back due to a previous exception during flush. To begin a new transaction with this Session, first issue Session.rollback(). Original exception was: (_mysql_exceptions.DataError) (1406, "Data too long for column 'value' at row 1")
I know I should be pushing data into XCom inchunks
but am not sure if that can be really done. Is there a standard way to parse large amounts of data in Xcom? Any other alternative of achieving this using Airflow would also be helpfu. The only thing I can think of is writing the data to GCS, loading into MongoDB and then deleting the GCS file.
#-------- COPY PASTED BigQueryGetDataOperator SECTION: START --------------
'''Source: https://airflow.readthedocs.io/en/stable/_modules/airflow/contrib/operators/bigquery_get_data.html#BigQueryGetDataOperator '''
from airflow.contrib.hooks.bigquery_hook import BigQueryHook
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
class BigQueryGetDataOperator(BaseOperator):
template_fields = ('dataset_id', 'table_id', 'max_results')
ui_color = '#e4f0e8'
@apply_defaults
def __init__(self,
dataset_id,
table_id,
max_results='100',
selected_fields=None,
bigquery_conn_id='bigquery_default',
delegate_to=None,
*args,
**kwargs):
super(BigQueryGetDataOperator, self).__init__(*args, **kwargs)
self.dataset_id = dataset_id
self.table_id = table_id
self.max_results = max_results
self.selected_fields = selected_fields
self.bigquery_conn_id = bigquery_conn_id
self.delegate_to = delegate_to
def execute(self, context):
self.log.info('Fetching Data from:')
self.log.info('Dataset: %s ; Table: %s ; Max Results: %s',
self.dataset_id, self.table_id, self.max_results)
hook = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id,
delegate_to=self.delegate_to)
conn = hook.get_conn()
cursor = conn.cursor()
response = cursor.get_tabledata(dataset_id=self.dataset_id,
table_id=self.table_id,
max_results=self.max_results,
selected_fields=self.selected_fields)
self.log.info('Total Extracted rows: %s', response['totalRows'])
rows = response['rows']
return rows
# Below lines were commented as I did not want a list but a json
# table_data =
# for dict_row in rows:
# single_row =
# for fields in dict_row['f']:
# single_row.append(fields['v'])
# table_data.append(single_row)
# return table_data
#----------------------- COPY PASTED SECTION: END ----------------
from airflow import models
from airflow.operators.python_operator import PythonOperator
from airflow.utils import trigger_rule
from airflow.contrib.operators import gcs_to_bq
from airflow.contrib.operators import bigquery_to_gcs
from airflow.contrib.operators import bigquery_operator
from airflow.contrib.operators import bigquery_get_data
from airflow.contrib.operators import MongoHook
def get_dlist(**kwargs):
import logging as log
#Import pymongo
from pymongo import MongoClient
#Pull the data saved in XCom
value = kwargs.get('task_instance').xcom_pull(task_ids='get_data_in_list_from_bq')
header = ['V1','V2']
data=
for rows in value:
onerow={}
for i,f in zip(range(len(rows['f'])),rows['f']):
onerow[header[i]] = f['v']
data.append(onerow)
log.info("Pulled...")
log.info(data)
log.info("Pushing into mongodb...")
client = MongoClient(localhost:27017)
db = client.test
collection = db.testingbq2mongo
collection.insert(data)
log.info("Written to mongoDB...")
client.close()
default_dag_args = {
# Setting start date as yesterday starts the DAG immediately when it is
# detected in the Cloud Storage bucket.
'start_date':yesterday,
# To email on failure or retry set 'email' arg to your email and enable
# emailing here.
'email_on_failure': False,
'email_on_retry': False,
# If a task fails, retry it once after waiting at least 5 minutes
'retries': 0,
#'retry_delay': datetime.timedelta(minutes=5),
'project_id': 'data-rubrics'
}
try:
# [START composer_quickstart_schedule]
with models.DAG(
'composer_testing00',
# Continue to run DAG once per day
schedule_interval=datetime.timedelta(days=1),
default_args=default_dag_args) as dag:
# [END composer_quickstart_schedule]
data_list = bigquery_get_data.BigQueryGetDataOperator(
task_id='get_data_in_list_from_bq',
dataset_id='testcomposer', # Name of the dataset which contains the table ( a BQ terminology)
table_id='summarized_sample_T1' # Name of the BQ table you want to push into MongoDB
)
op_push2mongo = PythonOperator(task_id='Push_to_MongoDB', python_callable=get_dlist, provide_context=True)
data_list >> op_push2mongo
except Exception as e:
raise(e)
EDIT 2
#-------- COPY PASTED BigQueryGetDataOperator SECTION: START --------------
'''Source: https://airflow.readthedocs.io/en/stable/_modules/airflow/contrib/operators/bigquery_get_data.html#BigQueryGetDataOperator '''
from airflow.contrib.hooks.bigquery_hook import BigQueryHook
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
class BigQueryGetDataOperator(BaseOperator):
template_fields = ('dataset_id', 'table_id', 'max_results')
ui_color = '#e4f0e8'
@apply_defaults
def __init__(self,
dataset_id,
table_id,
max_results='100',
selected_fields=None,
bigquery_conn_id='bigquery_default',
delegate_to=None,
*args,
**kwargs):
super(BigQueryGetDataOperator, self).__init__(*args, **kwargs)
self.dataset_id = dataset_id
self.table_id = table_id
self.max_results = max_results
self.selected_fields = selected_fields
self.bigquery_conn_id = bigquery_conn_id
self.delegate_to = delegate_to
def execute(self, context):
self.log.info('Fetching Data from:')
self.log.info('Dataset: %s ; Table: %s ; Max Results: %s',
self.dataset_id, self.table_id, self.max_results)
hook = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id,
delegate_to=self.delegate_to)
conn = hook.get_conn()
cursor = conn.cursor()
#----------------------- COPY PASTED SECTION: END ----------------
# Trying to add to a MongoDB here itself - coed by GT
from pymongo import MongoClient
header = ['day', 'ticker','app_id','area', 'store_types', 'devices_in_store', 'devices_in_store_or_plot', 'matched_devices',
'all_devices']
client = MongoClient('35.237.46.25:27017')
db = client.test03
collection = db.advan_t1_sample_mongo00
response = cursor.get_tabledata(dataset_id=self.dataset_id, start_index=0,
table_id=self.table_id,
max_results='2',
selected_fields=self.selected_fields)
total_rows=int(response['totalRows'])
chunksize=100000
for chunk in range(0,total_rows,chunksize):
rows=
if chunk+chunksize<total_rows:
self.log.info("Extracting chunk %d to %d"%(chunk,chunk+chunksize))
response = cursor.get_tabledata(dataset_id=self.dataset_id, start_index=chunk,
table_id=self.table_id,
max_results=str(chunksize),
selected_fields=self.selected_fields)
rows = response['rows']
for row in rows:
onerow={}
for i,f in zip(range(len(row['f'])),row['f']):
onerow[header[i]] = f['v']
collection.insert_one(onerow)
self.log.info("------------------------- Document size: %d --------------------"%(collection.find().count()))
else:
self.log.info("Extracting chunk %d to %d"%(chunk,total_rows))
response = cursor.get_tabledata(dataset_id=self.dataset_id, start_index=chunk,
table_id=self.table_id,
max_results=total_rows,
selected_fields=self.selected_fields)
rows = response['rows']
for row in rows:
onerow={}
for i,f in zip(range(len(row['f'])),row['f']):
onerow[header[i]] = f['v']
collection.insert_one(onerow)
self.log.info("------------------------- Document size: %d --------------------"%(collection.find().count()))
self.log.info("Pushed into %s"%collection.name)
if total_rows == collection.find().count():
self.log.info("Successfully pushed %d records into %s"%(total_rows,collection.name))
return(1)
else:
self.log.warning("Push Failed! Total Rows: %d Document Size: %d"%(total_rows,collection.find().count()))
return(0)
# return rows
from airflow import models
from airflow.operators.python_operator import PythonOperator
from airflow.utils import trigger_rule
from airflow.contrib.operators import gcs_to_bq
from airflow.contrib.operators import bigquery_to_gcs
from airflow.contrib.operators import bigquery_operator
from airflow.contrib.operators import bigquery_get_data
from airflow.contrib.operators import MongoHook
def get_dlist(**kwargs):
import logging as log
#Import pymongo
from pymongo import MongoClient
#Pull the data saved in XCom
value = kwargs.get('task_instance').xcom_pull(task_ids='get_data_in_list_from_bq')
header = ['V1','V2']
data=
for rows in value:
onerow={}
for i,f in zip(range(len(rows['f'])),rows['f']):
onerow[header[i]] = f['v']
data.append(onerow)
log.info("Pulled...")
log.info(data)
log.info("Pushing into mongodb...")
client = MongoClient(localhost:27017)
db = client.test
collection = db.testingbq2mongo
collection.insert(data)
log.info("Written to mongoDB...")
client.close()
default_dag_args = {
# Setting start date as yesterday starts the DAG immediately when it is
# detected in the Cloud Storage bucket.
'start_date':yesterday,
# To email on failure or retry set 'email' arg to your email and enable
# emailing here.
'email_on_failure': False,
'email_on_retry': False,
# If a task fails, retry it once after waiting at least 5 minutes
'retries': 0,
#'retry_delay': datetime.timedelta(minutes=5),
'project_id': 'data-rubrics'
}
try:
# [START composer_quickstart_schedule]
with models.DAG(
'composer_testing00',
# Continue to run DAG once per day
schedule_interval=datetime.timedelta(days=1),
default_args=default_dag_args) as dag:
# [END composer_quickstart_schedule]
data_list = bigquery_get_data.BigQueryGetDataOperator(
task_id='get_data_in_list_from_bq',
dataset_id='testcomposer', # Name of the dataset which contains the table ( a BQ terminology)
table_id='summarized_sample_T1' # Name of the BQ table you want to push into MongoDB
)
op_push2mongo = PythonOperator(task_id='Push_to_MongoDB', python_callable=get_dlist, provide_context=True)
data_list >> op_push2mongo
except Exception as e:
raise(e)
python mongodb google-bigquery airflow google-cloud-composer
add a comment |
I am trying to set up data pipelines that move data from GCS to BigQuery , perform certain tasks/processing and load it to a MongoDB cluster ( all set up in python using DAGs). I have been able to achieve this up until the load to MongoDB. Are there any existing airflow operators that can do this? If not, is it possible to create a custom code using the mongoDB hooks provided in airflow?
Thanks,
GT
EDIT 1
I used the MongoHook
and source code from BigQueryGetDataOperator
(Code snippet below). My problem now is that I need to make this work for 10++ million rows and when I increase the max_results='100'
default in BigQueryGetDataOperator
I get an error :
sqlalchemy.exc.InvalidRequestError: This Session's transaction has been rolled back due to a previous exception during flush. To begin a new transaction with this Session, first issue Session.rollback(). Original exception was: (_mysql_exceptions.DataError) (1406, "Data too long for column 'value' at row 1")
I know I should be pushing data into XCom inchunks
but am not sure if that can be really done. Is there a standard way to parse large amounts of data in Xcom? Any other alternative of achieving this using Airflow would also be helpfu. The only thing I can think of is writing the data to GCS, loading into MongoDB and then deleting the GCS file.
#-------- COPY PASTED BigQueryGetDataOperator SECTION: START --------------
'''Source: https://airflow.readthedocs.io/en/stable/_modules/airflow/contrib/operators/bigquery_get_data.html#BigQueryGetDataOperator '''
from airflow.contrib.hooks.bigquery_hook import BigQueryHook
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
class BigQueryGetDataOperator(BaseOperator):
template_fields = ('dataset_id', 'table_id', 'max_results')
ui_color = '#e4f0e8'
@apply_defaults
def __init__(self,
dataset_id,
table_id,
max_results='100',
selected_fields=None,
bigquery_conn_id='bigquery_default',
delegate_to=None,
*args,
**kwargs):
super(BigQueryGetDataOperator, self).__init__(*args, **kwargs)
self.dataset_id = dataset_id
self.table_id = table_id
self.max_results = max_results
self.selected_fields = selected_fields
self.bigquery_conn_id = bigquery_conn_id
self.delegate_to = delegate_to
def execute(self, context):
self.log.info('Fetching Data from:')
self.log.info('Dataset: %s ; Table: %s ; Max Results: %s',
self.dataset_id, self.table_id, self.max_results)
hook = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id,
delegate_to=self.delegate_to)
conn = hook.get_conn()
cursor = conn.cursor()
response = cursor.get_tabledata(dataset_id=self.dataset_id,
table_id=self.table_id,
max_results=self.max_results,
selected_fields=self.selected_fields)
self.log.info('Total Extracted rows: %s', response['totalRows'])
rows = response['rows']
return rows
# Below lines were commented as I did not want a list but a json
# table_data =
# for dict_row in rows:
# single_row =
# for fields in dict_row['f']:
# single_row.append(fields['v'])
# table_data.append(single_row)
# return table_data
#----------------------- COPY PASTED SECTION: END ----------------
from airflow import models
from airflow.operators.python_operator import PythonOperator
from airflow.utils import trigger_rule
from airflow.contrib.operators import gcs_to_bq
from airflow.contrib.operators import bigquery_to_gcs
from airflow.contrib.operators import bigquery_operator
from airflow.contrib.operators import bigquery_get_data
from airflow.contrib.operators import MongoHook
def get_dlist(**kwargs):
import logging as log
#Import pymongo
from pymongo import MongoClient
#Pull the data saved in XCom
value = kwargs.get('task_instance').xcom_pull(task_ids='get_data_in_list_from_bq')
header = ['V1','V2']
data=
for rows in value:
onerow={}
for i,f in zip(range(len(rows['f'])),rows['f']):
onerow[header[i]] = f['v']
data.append(onerow)
log.info("Pulled...")
log.info(data)
log.info("Pushing into mongodb...")
client = MongoClient(localhost:27017)
db = client.test
collection = db.testingbq2mongo
collection.insert(data)
log.info("Written to mongoDB...")
client.close()
default_dag_args = {
# Setting start date as yesterday starts the DAG immediately when it is
# detected in the Cloud Storage bucket.
'start_date':yesterday,
# To email on failure or retry set 'email' arg to your email and enable
# emailing here.
'email_on_failure': False,
'email_on_retry': False,
# If a task fails, retry it once after waiting at least 5 minutes
'retries': 0,
#'retry_delay': datetime.timedelta(minutes=5),
'project_id': 'data-rubrics'
}
try:
# [START composer_quickstart_schedule]
with models.DAG(
'composer_testing00',
# Continue to run DAG once per day
schedule_interval=datetime.timedelta(days=1),
default_args=default_dag_args) as dag:
# [END composer_quickstart_schedule]
data_list = bigquery_get_data.BigQueryGetDataOperator(
task_id='get_data_in_list_from_bq',
dataset_id='testcomposer', # Name of the dataset which contains the table ( a BQ terminology)
table_id='summarized_sample_T1' # Name of the BQ table you want to push into MongoDB
)
op_push2mongo = PythonOperator(task_id='Push_to_MongoDB', python_callable=get_dlist, provide_context=True)
data_list >> op_push2mongo
except Exception as e:
raise(e)
EDIT 2
#-------- COPY PASTED BigQueryGetDataOperator SECTION: START --------------
'''Source: https://airflow.readthedocs.io/en/stable/_modules/airflow/contrib/operators/bigquery_get_data.html#BigQueryGetDataOperator '''
from airflow.contrib.hooks.bigquery_hook import BigQueryHook
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
class BigQueryGetDataOperator(BaseOperator):
template_fields = ('dataset_id', 'table_id', 'max_results')
ui_color = '#e4f0e8'
@apply_defaults
def __init__(self,
dataset_id,
table_id,
max_results='100',
selected_fields=None,
bigquery_conn_id='bigquery_default',
delegate_to=None,
*args,
**kwargs):
super(BigQueryGetDataOperator, self).__init__(*args, **kwargs)
self.dataset_id = dataset_id
self.table_id = table_id
self.max_results = max_results
self.selected_fields = selected_fields
self.bigquery_conn_id = bigquery_conn_id
self.delegate_to = delegate_to
def execute(self, context):
self.log.info('Fetching Data from:')
self.log.info('Dataset: %s ; Table: %s ; Max Results: %s',
self.dataset_id, self.table_id, self.max_results)
hook = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id,
delegate_to=self.delegate_to)
conn = hook.get_conn()
cursor = conn.cursor()
#----------------------- COPY PASTED SECTION: END ----------------
# Trying to add to a MongoDB here itself - coed by GT
from pymongo import MongoClient
header = ['day', 'ticker','app_id','area', 'store_types', 'devices_in_store', 'devices_in_store_or_plot', 'matched_devices',
'all_devices']
client = MongoClient('35.237.46.25:27017')
db = client.test03
collection = db.advan_t1_sample_mongo00
response = cursor.get_tabledata(dataset_id=self.dataset_id, start_index=0,
table_id=self.table_id,
max_results='2',
selected_fields=self.selected_fields)
total_rows=int(response['totalRows'])
chunksize=100000
for chunk in range(0,total_rows,chunksize):
rows=
if chunk+chunksize<total_rows:
self.log.info("Extracting chunk %d to %d"%(chunk,chunk+chunksize))
response = cursor.get_tabledata(dataset_id=self.dataset_id, start_index=chunk,
table_id=self.table_id,
max_results=str(chunksize),
selected_fields=self.selected_fields)
rows = response['rows']
for row in rows:
onerow={}
for i,f in zip(range(len(row['f'])),row['f']):
onerow[header[i]] = f['v']
collection.insert_one(onerow)
self.log.info("------------------------- Document size: %d --------------------"%(collection.find().count()))
else:
self.log.info("Extracting chunk %d to %d"%(chunk,total_rows))
response = cursor.get_tabledata(dataset_id=self.dataset_id, start_index=chunk,
table_id=self.table_id,
max_results=total_rows,
selected_fields=self.selected_fields)
rows = response['rows']
for row in rows:
onerow={}
for i,f in zip(range(len(row['f'])),row['f']):
onerow[header[i]] = f['v']
collection.insert_one(onerow)
self.log.info("------------------------- Document size: %d --------------------"%(collection.find().count()))
self.log.info("Pushed into %s"%collection.name)
if total_rows == collection.find().count():
self.log.info("Successfully pushed %d records into %s"%(total_rows,collection.name))
return(1)
else:
self.log.warning("Push Failed! Total Rows: %d Document Size: %d"%(total_rows,collection.find().count()))
return(0)
# return rows
from airflow import models
from airflow.operators.python_operator import PythonOperator
from airflow.utils import trigger_rule
from airflow.contrib.operators import gcs_to_bq
from airflow.contrib.operators import bigquery_to_gcs
from airflow.contrib.operators import bigquery_operator
from airflow.contrib.operators import bigquery_get_data
from airflow.contrib.operators import MongoHook
def get_dlist(**kwargs):
import logging as log
#Import pymongo
from pymongo import MongoClient
#Pull the data saved in XCom
value = kwargs.get('task_instance').xcom_pull(task_ids='get_data_in_list_from_bq')
header = ['V1','V2']
data=
for rows in value:
onerow={}
for i,f in zip(range(len(rows['f'])),rows['f']):
onerow[header[i]] = f['v']
data.append(onerow)
log.info("Pulled...")
log.info(data)
log.info("Pushing into mongodb...")
client = MongoClient(localhost:27017)
db = client.test
collection = db.testingbq2mongo
collection.insert(data)
log.info("Written to mongoDB...")
client.close()
default_dag_args = {
# Setting start date as yesterday starts the DAG immediately when it is
# detected in the Cloud Storage bucket.
'start_date':yesterday,
# To email on failure or retry set 'email' arg to your email and enable
# emailing here.
'email_on_failure': False,
'email_on_retry': False,
# If a task fails, retry it once after waiting at least 5 minutes
'retries': 0,
#'retry_delay': datetime.timedelta(minutes=5),
'project_id': 'data-rubrics'
}
try:
# [START composer_quickstart_schedule]
with models.DAG(
'composer_testing00',
# Continue to run DAG once per day
schedule_interval=datetime.timedelta(days=1),
default_args=default_dag_args) as dag:
# [END composer_quickstart_schedule]
data_list = bigquery_get_data.BigQueryGetDataOperator(
task_id='get_data_in_list_from_bq',
dataset_id='testcomposer', # Name of the dataset which contains the table ( a BQ terminology)
table_id='summarized_sample_T1' # Name of the BQ table you want to push into MongoDB
)
op_push2mongo = PythonOperator(task_id='Push_to_MongoDB', python_callable=get_dlist, provide_context=True)
data_list >> op_push2mongo
except Exception as e:
raise(e)
python mongodb google-bigquery airflow google-cloud-composer
add a comment |
I am trying to set up data pipelines that move data from GCS to BigQuery , perform certain tasks/processing and load it to a MongoDB cluster ( all set up in python using DAGs). I have been able to achieve this up until the load to MongoDB. Are there any existing airflow operators that can do this? If not, is it possible to create a custom code using the mongoDB hooks provided in airflow?
Thanks,
GT
EDIT 1
I used the MongoHook
and source code from BigQueryGetDataOperator
(Code snippet below). My problem now is that I need to make this work for 10++ million rows and when I increase the max_results='100'
default in BigQueryGetDataOperator
I get an error :
sqlalchemy.exc.InvalidRequestError: This Session's transaction has been rolled back due to a previous exception during flush. To begin a new transaction with this Session, first issue Session.rollback(). Original exception was: (_mysql_exceptions.DataError) (1406, "Data too long for column 'value' at row 1")
I know I should be pushing data into XCom inchunks
but am not sure if that can be really done. Is there a standard way to parse large amounts of data in Xcom? Any other alternative of achieving this using Airflow would also be helpfu. The only thing I can think of is writing the data to GCS, loading into MongoDB and then deleting the GCS file.
#-------- COPY PASTED BigQueryGetDataOperator SECTION: START --------------
'''Source: https://airflow.readthedocs.io/en/stable/_modules/airflow/contrib/operators/bigquery_get_data.html#BigQueryGetDataOperator '''
from airflow.contrib.hooks.bigquery_hook import BigQueryHook
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
class BigQueryGetDataOperator(BaseOperator):
template_fields = ('dataset_id', 'table_id', 'max_results')
ui_color = '#e4f0e8'
@apply_defaults
def __init__(self,
dataset_id,
table_id,
max_results='100',
selected_fields=None,
bigquery_conn_id='bigquery_default',
delegate_to=None,
*args,
**kwargs):
super(BigQueryGetDataOperator, self).__init__(*args, **kwargs)
self.dataset_id = dataset_id
self.table_id = table_id
self.max_results = max_results
self.selected_fields = selected_fields
self.bigquery_conn_id = bigquery_conn_id
self.delegate_to = delegate_to
def execute(self, context):
self.log.info('Fetching Data from:')
self.log.info('Dataset: %s ; Table: %s ; Max Results: %s',
self.dataset_id, self.table_id, self.max_results)
hook = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id,
delegate_to=self.delegate_to)
conn = hook.get_conn()
cursor = conn.cursor()
response = cursor.get_tabledata(dataset_id=self.dataset_id,
table_id=self.table_id,
max_results=self.max_results,
selected_fields=self.selected_fields)
self.log.info('Total Extracted rows: %s', response['totalRows'])
rows = response['rows']
return rows
# Below lines were commented as I did not want a list but a json
# table_data =
# for dict_row in rows:
# single_row =
# for fields in dict_row['f']:
# single_row.append(fields['v'])
# table_data.append(single_row)
# return table_data
#----------------------- COPY PASTED SECTION: END ----------------
from airflow import models
from airflow.operators.python_operator import PythonOperator
from airflow.utils import trigger_rule
from airflow.contrib.operators import gcs_to_bq
from airflow.contrib.operators import bigquery_to_gcs
from airflow.contrib.operators import bigquery_operator
from airflow.contrib.operators import bigquery_get_data
from airflow.contrib.operators import MongoHook
def get_dlist(**kwargs):
import logging as log
#Import pymongo
from pymongo import MongoClient
#Pull the data saved in XCom
value = kwargs.get('task_instance').xcom_pull(task_ids='get_data_in_list_from_bq')
header = ['V1','V2']
data=
for rows in value:
onerow={}
for i,f in zip(range(len(rows['f'])),rows['f']):
onerow[header[i]] = f['v']
data.append(onerow)
log.info("Pulled...")
log.info(data)
log.info("Pushing into mongodb...")
client = MongoClient(localhost:27017)
db = client.test
collection = db.testingbq2mongo
collection.insert(data)
log.info("Written to mongoDB...")
client.close()
default_dag_args = {
# Setting start date as yesterday starts the DAG immediately when it is
# detected in the Cloud Storage bucket.
'start_date':yesterday,
# To email on failure or retry set 'email' arg to your email and enable
# emailing here.
'email_on_failure': False,
'email_on_retry': False,
# If a task fails, retry it once after waiting at least 5 minutes
'retries': 0,
#'retry_delay': datetime.timedelta(minutes=5),
'project_id': 'data-rubrics'
}
try:
# [START composer_quickstart_schedule]
with models.DAG(
'composer_testing00',
# Continue to run DAG once per day
schedule_interval=datetime.timedelta(days=1),
default_args=default_dag_args) as dag:
# [END composer_quickstart_schedule]
data_list = bigquery_get_data.BigQueryGetDataOperator(
task_id='get_data_in_list_from_bq',
dataset_id='testcomposer', # Name of the dataset which contains the table ( a BQ terminology)
table_id='summarized_sample_T1' # Name of the BQ table you want to push into MongoDB
)
op_push2mongo = PythonOperator(task_id='Push_to_MongoDB', python_callable=get_dlist, provide_context=True)
data_list >> op_push2mongo
except Exception as e:
raise(e)
EDIT 2
#-------- COPY PASTED BigQueryGetDataOperator SECTION: START --------------
'''Source: https://airflow.readthedocs.io/en/stable/_modules/airflow/contrib/operators/bigquery_get_data.html#BigQueryGetDataOperator '''
from airflow.contrib.hooks.bigquery_hook import BigQueryHook
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
class BigQueryGetDataOperator(BaseOperator):
template_fields = ('dataset_id', 'table_id', 'max_results')
ui_color = '#e4f0e8'
@apply_defaults
def __init__(self,
dataset_id,
table_id,
max_results='100',
selected_fields=None,
bigquery_conn_id='bigquery_default',
delegate_to=None,
*args,
**kwargs):
super(BigQueryGetDataOperator, self).__init__(*args, **kwargs)
self.dataset_id = dataset_id
self.table_id = table_id
self.max_results = max_results
self.selected_fields = selected_fields
self.bigquery_conn_id = bigquery_conn_id
self.delegate_to = delegate_to
def execute(self, context):
self.log.info('Fetching Data from:')
self.log.info('Dataset: %s ; Table: %s ; Max Results: %s',
self.dataset_id, self.table_id, self.max_results)
hook = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id,
delegate_to=self.delegate_to)
conn = hook.get_conn()
cursor = conn.cursor()
#----------------------- COPY PASTED SECTION: END ----------------
# Trying to add to a MongoDB here itself - coed by GT
from pymongo import MongoClient
header = ['day', 'ticker','app_id','area', 'store_types', 'devices_in_store', 'devices_in_store_or_plot', 'matched_devices',
'all_devices']
client = MongoClient('35.237.46.25:27017')
db = client.test03
collection = db.advan_t1_sample_mongo00
response = cursor.get_tabledata(dataset_id=self.dataset_id, start_index=0,
table_id=self.table_id,
max_results='2',
selected_fields=self.selected_fields)
total_rows=int(response['totalRows'])
chunksize=100000
for chunk in range(0,total_rows,chunksize):
rows=
if chunk+chunksize<total_rows:
self.log.info("Extracting chunk %d to %d"%(chunk,chunk+chunksize))
response = cursor.get_tabledata(dataset_id=self.dataset_id, start_index=chunk,
table_id=self.table_id,
max_results=str(chunksize),
selected_fields=self.selected_fields)
rows = response['rows']
for row in rows:
onerow={}
for i,f in zip(range(len(row['f'])),row['f']):
onerow[header[i]] = f['v']
collection.insert_one(onerow)
self.log.info("------------------------- Document size: %d --------------------"%(collection.find().count()))
else:
self.log.info("Extracting chunk %d to %d"%(chunk,total_rows))
response = cursor.get_tabledata(dataset_id=self.dataset_id, start_index=chunk,
table_id=self.table_id,
max_results=total_rows,
selected_fields=self.selected_fields)
rows = response['rows']
for row in rows:
onerow={}
for i,f in zip(range(len(row['f'])),row['f']):
onerow[header[i]] = f['v']
collection.insert_one(onerow)
self.log.info("------------------------- Document size: %d --------------------"%(collection.find().count()))
self.log.info("Pushed into %s"%collection.name)
if total_rows == collection.find().count():
self.log.info("Successfully pushed %d records into %s"%(total_rows,collection.name))
return(1)
else:
self.log.warning("Push Failed! Total Rows: %d Document Size: %d"%(total_rows,collection.find().count()))
return(0)
# return rows
from airflow import models
from airflow.operators.python_operator import PythonOperator
from airflow.utils import trigger_rule
from airflow.contrib.operators import gcs_to_bq
from airflow.contrib.operators import bigquery_to_gcs
from airflow.contrib.operators import bigquery_operator
from airflow.contrib.operators import bigquery_get_data
from airflow.contrib.operators import MongoHook
def get_dlist(**kwargs):
import logging as log
#Import pymongo
from pymongo import MongoClient
#Pull the data saved in XCom
value = kwargs.get('task_instance').xcom_pull(task_ids='get_data_in_list_from_bq')
header = ['V1','V2']
data=
for rows in value:
onerow={}
for i,f in zip(range(len(rows['f'])),rows['f']):
onerow[header[i]] = f['v']
data.append(onerow)
log.info("Pulled...")
log.info(data)
log.info("Pushing into mongodb...")
client = MongoClient(localhost:27017)
db = client.test
collection = db.testingbq2mongo
collection.insert(data)
log.info("Written to mongoDB...")
client.close()
default_dag_args = {
# Setting start date as yesterday starts the DAG immediately when it is
# detected in the Cloud Storage bucket.
'start_date':yesterday,
# To email on failure or retry set 'email' arg to your email and enable
# emailing here.
'email_on_failure': False,
'email_on_retry': False,
# If a task fails, retry it once after waiting at least 5 minutes
'retries': 0,
#'retry_delay': datetime.timedelta(minutes=5),
'project_id': 'data-rubrics'
}
try:
# [START composer_quickstart_schedule]
with models.DAG(
'composer_testing00',
# Continue to run DAG once per day
schedule_interval=datetime.timedelta(days=1),
default_args=default_dag_args) as dag:
# [END composer_quickstart_schedule]
data_list = bigquery_get_data.BigQueryGetDataOperator(
task_id='get_data_in_list_from_bq',
dataset_id='testcomposer', # Name of the dataset which contains the table ( a BQ terminology)
table_id='summarized_sample_T1' # Name of the BQ table you want to push into MongoDB
)
op_push2mongo = PythonOperator(task_id='Push_to_MongoDB', python_callable=get_dlist, provide_context=True)
data_list >> op_push2mongo
except Exception as e:
raise(e)
python mongodb google-bigquery airflow google-cloud-composer
I am trying to set up data pipelines that move data from GCS to BigQuery , perform certain tasks/processing and load it to a MongoDB cluster ( all set up in python using DAGs). I have been able to achieve this up until the load to MongoDB. Are there any existing airflow operators that can do this? If not, is it possible to create a custom code using the mongoDB hooks provided in airflow?
Thanks,
GT
EDIT 1
I used the MongoHook
and source code from BigQueryGetDataOperator
(Code snippet below). My problem now is that I need to make this work for 10++ million rows and when I increase the max_results='100'
default in BigQueryGetDataOperator
I get an error :
sqlalchemy.exc.InvalidRequestError: This Session's transaction has been rolled back due to a previous exception during flush. To begin a new transaction with this Session, first issue Session.rollback(). Original exception was: (_mysql_exceptions.DataError) (1406, "Data too long for column 'value' at row 1")
I know I should be pushing data into XCom inchunks
but am not sure if that can be really done. Is there a standard way to parse large amounts of data in Xcom? Any other alternative of achieving this using Airflow would also be helpfu. The only thing I can think of is writing the data to GCS, loading into MongoDB and then deleting the GCS file.
#-------- COPY PASTED BigQueryGetDataOperator SECTION: START --------------
'''Source: https://airflow.readthedocs.io/en/stable/_modules/airflow/contrib/operators/bigquery_get_data.html#BigQueryGetDataOperator '''
from airflow.contrib.hooks.bigquery_hook import BigQueryHook
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
class BigQueryGetDataOperator(BaseOperator):
template_fields = ('dataset_id', 'table_id', 'max_results')
ui_color = '#e4f0e8'
@apply_defaults
def __init__(self,
dataset_id,
table_id,
max_results='100',
selected_fields=None,
bigquery_conn_id='bigquery_default',
delegate_to=None,
*args,
**kwargs):
super(BigQueryGetDataOperator, self).__init__(*args, **kwargs)
self.dataset_id = dataset_id
self.table_id = table_id
self.max_results = max_results
self.selected_fields = selected_fields
self.bigquery_conn_id = bigquery_conn_id
self.delegate_to = delegate_to
def execute(self, context):
self.log.info('Fetching Data from:')
self.log.info('Dataset: %s ; Table: %s ; Max Results: %s',
self.dataset_id, self.table_id, self.max_results)
hook = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id,
delegate_to=self.delegate_to)
conn = hook.get_conn()
cursor = conn.cursor()
response = cursor.get_tabledata(dataset_id=self.dataset_id,
table_id=self.table_id,
max_results=self.max_results,
selected_fields=self.selected_fields)
self.log.info('Total Extracted rows: %s', response['totalRows'])
rows = response['rows']
return rows
# Below lines were commented as I did not want a list but a json
# table_data =
# for dict_row in rows:
# single_row =
# for fields in dict_row['f']:
# single_row.append(fields['v'])
# table_data.append(single_row)
# return table_data
#----------------------- COPY PASTED SECTION: END ----------------
from airflow import models
from airflow.operators.python_operator import PythonOperator
from airflow.utils import trigger_rule
from airflow.contrib.operators import gcs_to_bq
from airflow.contrib.operators import bigquery_to_gcs
from airflow.contrib.operators import bigquery_operator
from airflow.contrib.operators import bigquery_get_data
from airflow.contrib.operators import MongoHook
def get_dlist(**kwargs):
import logging as log
#Import pymongo
from pymongo import MongoClient
#Pull the data saved in XCom
value = kwargs.get('task_instance').xcom_pull(task_ids='get_data_in_list_from_bq')
header = ['V1','V2']
data=
for rows in value:
onerow={}
for i,f in zip(range(len(rows['f'])),rows['f']):
onerow[header[i]] = f['v']
data.append(onerow)
log.info("Pulled...")
log.info(data)
log.info("Pushing into mongodb...")
client = MongoClient(localhost:27017)
db = client.test
collection = db.testingbq2mongo
collection.insert(data)
log.info("Written to mongoDB...")
client.close()
default_dag_args = {
# Setting start date as yesterday starts the DAG immediately when it is
# detected in the Cloud Storage bucket.
'start_date':yesterday,
# To email on failure or retry set 'email' arg to your email and enable
# emailing here.
'email_on_failure': False,
'email_on_retry': False,
# If a task fails, retry it once after waiting at least 5 minutes
'retries': 0,
#'retry_delay': datetime.timedelta(minutes=5),
'project_id': 'data-rubrics'
}
try:
# [START composer_quickstart_schedule]
with models.DAG(
'composer_testing00',
# Continue to run DAG once per day
schedule_interval=datetime.timedelta(days=1),
default_args=default_dag_args) as dag:
# [END composer_quickstart_schedule]
data_list = bigquery_get_data.BigQueryGetDataOperator(
task_id='get_data_in_list_from_bq',
dataset_id='testcomposer', # Name of the dataset which contains the table ( a BQ terminology)
table_id='summarized_sample_T1' # Name of the BQ table you want to push into MongoDB
)
op_push2mongo = PythonOperator(task_id='Push_to_MongoDB', python_callable=get_dlist, provide_context=True)
data_list >> op_push2mongo
except Exception as e:
raise(e)
EDIT 2
#-------- COPY PASTED BigQueryGetDataOperator SECTION: START --------------
'''Source: https://airflow.readthedocs.io/en/stable/_modules/airflow/contrib/operators/bigquery_get_data.html#BigQueryGetDataOperator '''
from airflow.contrib.hooks.bigquery_hook import BigQueryHook
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
class BigQueryGetDataOperator(BaseOperator):
template_fields = ('dataset_id', 'table_id', 'max_results')
ui_color = '#e4f0e8'
@apply_defaults
def __init__(self,
dataset_id,
table_id,
max_results='100',
selected_fields=None,
bigquery_conn_id='bigquery_default',
delegate_to=None,
*args,
**kwargs):
super(BigQueryGetDataOperator, self).__init__(*args, **kwargs)
self.dataset_id = dataset_id
self.table_id = table_id
self.max_results = max_results
self.selected_fields = selected_fields
self.bigquery_conn_id = bigquery_conn_id
self.delegate_to = delegate_to
def execute(self, context):
self.log.info('Fetching Data from:')
self.log.info('Dataset: %s ; Table: %s ; Max Results: %s',
self.dataset_id, self.table_id, self.max_results)
hook = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id,
delegate_to=self.delegate_to)
conn = hook.get_conn()
cursor = conn.cursor()
#----------------------- COPY PASTED SECTION: END ----------------
# Trying to add to a MongoDB here itself - coed by GT
from pymongo import MongoClient
header = ['day', 'ticker','app_id','area', 'store_types', 'devices_in_store', 'devices_in_store_or_plot', 'matched_devices',
'all_devices']
client = MongoClient('35.237.46.25:27017')
db = client.test03
collection = db.advan_t1_sample_mongo00
response = cursor.get_tabledata(dataset_id=self.dataset_id, start_index=0,
table_id=self.table_id,
max_results='2',
selected_fields=self.selected_fields)
total_rows=int(response['totalRows'])
chunksize=100000
for chunk in range(0,total_rows,chunksize):
rows=
if chunk+chunksize<total_rows:
self.log.info("Extracting chunk %d to %d"%(chunk,chunk+chunksize))
response = cursor.get_tabledata(dataset_id=self.dataset_id, start_index=chunk,
table_id=self.table_id,
max_results=str(chunksize),
selected_fields=self.selected_fields)
rows = response['rows']
for row in rows:
onerow={}
for i,f in zip(range(len(row['f'])),row['f']):
onerow[header[i]] = f['v']
collection.insert_one(onerow)
self.log.info("------------------------- Document size: %d --------------------"%(collection.find().count()))
else:
self.log.info("Extracting chunk %d to %d"%(chunk,total_rows))
response = cursor.get_tabledata(dataset_id=self.dataset_id, start_index=chunk,
table_id=self.table_id,
max_results=total_rows,
selected_fields=self.selected_fields)
rows = response['rows']
for row in rows:
onerow={}
for i,f in zip(range(len(row['f'])),row['f']):
onerow[header[i]] = f['v']
collection.insert_one(onerow)
self.log.info("------------------------- Document size: %d --------------------"%(collection.find().count()))
self.log.info("Pushed into %s"%collection.name)
if total_rows == collection.find().count():
self.log.info("Successfully pushed %d records into %s"%(total_rows,collection.name))
return(1)
else:
self.log.warning("Push Failed! Total Rows: %d Document Size: %d"%(total_rows,collection.find().count()))
return(0)
# return rows
from airflow import models
from airflow.operators.python_operator import PythonOperator
from airflow.utils import trigger_rule
from airflow.contrib.operators import gcs_to_bq
from airflow.contrib.operators import bigquery_to_gcs
from airflow.contrib.operators import bigquery_operator
from airflow.contrib.operators import bigquery_get_data
from airflow.contrib.operators import MongoHook
def get_dlist(**kwargs):
import logging as log
#Import pymongo
from pymongo import MongoClient
#Pull the data saved in XCom
value = kwargs.get('task_instance').xcom_pull(task_ids='get_data_in_list_from_bq')
header = ['V1','V2']
data=
for rows in value:
onerow={}
for i,f in zip(range(len(rows['f'])),rows['f']):
onerow[header[i]] = f['v']
data.append(onerow)
log.info("Pulled...")
log.info(data)
log.info("Pushing into mongodb...")
client = MongoClient(localhost:27017)
db = client.test
collection = db.testingbq2mongo
collection.insert(data)
log.info("Written to mongoDB...")
client.close()
default_dag_args = {
# Setting start date as yesterday starts the DAG immediately when it is
# detected in the Cloud Storage bucket.
'start_date':yesterday,
# To email on failure or retry set 'email' arg to your email and enable
# emailing here.
'email_on_failure': False,
'email_on_retry': False,
# If a task fails, retry it once after waiting at least 5 minutes
'retries': 0,
#'retry_delay': datetime.timedelta(minutes=5),
'project_id': 'data-rubrics'
}
try:
# [START composer_quickstart_schedule]
with models.DAG(
'composer_testing00',
# Continue to run DAG once per day
schedule_interval=datetime.timedelta(days=1),
default_args=default_dag_args) as dag:
# [END composer_quickstart_schedule]
data_list = bigquery_get_data.BigQueryGetDataOperator(
task_id='get_data_in_list_from_bq',
dataset_id='testcomposer', # Name of the dataset which contains the table ( a BQ terminology)
table_id='summarized_sample_T1' # Name of the BQ table you want to push into MongoDB
)
op_push2mongo = PythonOperator(task_id='Push_to_MongoDB', python_callable=get_dlist, provide_context=True)
data_list >> op_push2mongo
except Exception as e:
raise(e)
python mongodb google-bigquery airflow google-cloud-composer
python mongodb google-bigquery airflow google-cloud-composer
edited Dec 11 '18 at 9:50
Gaurav Taneja
asked Nov 20 '18 at 13:14
Gaurav TanejaGaurav Taneja
764314
764314
add a comment |
add a comment |
2 Answers
2
active
oldest
votes
The easiest/quickest way would be to use a PythonOperator
and access the necessary hook object directly.
If you need to do this often I would recommend packaging the code up as a custom operator.
And if possible to PR this operator to Airflow repository so everyone can use it.
– Luis
Dec 9 '18 at 15:39
add a comment |
It's considered an antipattern to pass large amounts of data in XCOMs. I would recommend writing data from BigQuery to a durable storage service like Cloud Storage then loading into MongoDB from there.
add a comment |
Your Answer
StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");
StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});
function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});
}
});
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53393836%2fscheduling-load-from-bigquery-to-mongodb-using-apache-airflow-cloud-composer%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
2 Answers
2
active
oldest
votes
2 Answers
2
active
oldest
votes
active
oldest
votes
active
oldest
votes
The easiest/quickest way would be to use a PythonOperator
and access the necessary hook object directly.
If you need to do this often I would recommend packaging the code up as a custom operator.
And if possible to PR this operator to Airflow repository so everyone can use it.
– Luis
Dec 9 '18 at 15:39
add a comment |
The easiest/quickest way would be to use a PythonOperator
and access the necessary hook object directly.
If you need to do this often I would recommend packaging the code up as a custom operator.
And if possible to PR this operator to Airflow repository so everyone can use it.
– Luis
Dec 9 '18 at 15:39
add a comment |
The easiest/quickest way would be to use a PythonOperator
and access the necessary hook object directly.
If you need to do this often I would recommend packaging the code up as a custom operator.
The easiest/quickest way would be to use a PythonOperator
and access the necessary hook object directly.
If you need to do this often I would recommend packaging the code up as a custom operator.
answered Nov 21 '18 at 1:43
joebjoeb
2,20611519
2,20611519
And if possible to PR this operator to Airflow repository so everyone can use it.
– Luis
Dec 9 '18 at 15:39
add a comment |
And if possible to PR this operator to Airflow repository so everyone can use it.
– Luis
Dec 9 '18 at 15:39
And if possible to PR this operator to Airflow repository so everyone can use it.
– Luis
Dec 9 '18 at 15:39
And if possible to PR this operator to Airflow repository so everyone can use it.
– Luis
Dec 9 '18 at 15:39
add a comment |
It's considered an antipattern to pass large amounts of data in XCOMs. I would recommend writing data from BigQuery to a durable storage service like Cloud Storage then loading into MongoDB from there.
add a comment |
It's considered an antipattern to pass large amounts of data in XCOMs. I would recommend writing data from BigQuery to a durable storage service like Cloud Storage then loading into MongoDB from there.
add a comment |
It's considered an antipattern to pass large amounts of data in XCOMs. I would recommend writing data from BigQuery to a durable storage service like Cloud Storage then loading into MongoDB from there.
It's considered an antipattern to pass large amounts of data in XCOMs. I would recommend writing data from BigQuery to a durable storage service like Cloud Storage then loading into MongoDB from there.
answered Dec 10 '18 at 23:50
WilsonWilson
29514
29514
add a comment |
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53393836%2fscheduling-load-from-bigquery-to-mongodb-using-apache-airflow-cloud-composer%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown