Jan 24, 2020

Alembic Basics

Alembic

alembic revision --autogenerate
alembic history

alembic stamp head
alembic stamp <rev_no>

alembic upgrade head
alembic downgrade base


Jan 12, 2020

pytest Tutorial 3 - pycharm options


  • pytest.ini
    • You can define default options like -v, -m, -x etc.,
    • [pyest]
    • addopts = -v
  • Auto-run options
    • When you make some changes, it auto-run
    • You can set interval of 2, 3, 5 secs etc.,
    • You can find this in settings gear icon in Pycharm
  • Make default pytest
    • By default it is pointed to unittest in Pycharm
    • You can change to pytest in PyCharm test configurations under
    • Tools -> Python Integrated Tools -> Default Test Runner
  • Run configuration for individual files
    • For each file, you can set -v, -m options
    • Run -> Default Configurations -> Additional Arguments
    • -v
  • Set options Globally for all instead of individual files
    • Run -> Edit Configurations -> Defaults -> Python Tests -> py.test
    • Add it here in Additional Arguments
    • This will apply to all the test files
  • Window -> Edit Tabs -> Split Vertically
    • You need to work test cases looking at your code files/Readme.
    • This will help in view both files vertically at the same time

pytest in classes

import pytest

class TestSomeStuff():
    def test_one(self):
         assert 1 == 1
   
    def test_two(self):
         assert 2 == 2

    def test_three(self):
         assert 3 == 3

class TestOtherStuff():
    def test_eleven(self):
         assert 11 == 11
   
    def test_twenty_two(self):
         assert 22 == 22

    def test_thirty_three(self):
         assert 33 == 33

We can run as all the tests a different suites as classes.
TestSomeStuff
TestOtherStuff


Jan 11, 2020

pytest Tutorial 2 -Fixtures


student.py

import json

class StudentDB:
   def __init__(self):
        self.__data = None

   def connect(self, data_file):
        with open(data_file) as json_file:
            self.__data = json.loads(data_file)

   def get_data(self, name):
        for student in self.__data['students']:
             if student['name'] == name:
                 return student

test_student.py

from student import StudentDB
import pytest

# Fixture is more like a replacement for writing both setup and teardown
# If not fixture, you can write separate functions for setup_module, teardown_module
# You need to pass db to all the unit test functions
#If you not pass scope=module, this fixture is called for each and every unit test
@pytest.fixture(scope='module')  
def db:
    print('-------Inside setup-------')
    db = StudentDB()
    db.connect('data.json')
    yield db
    print('-------Inside teardown-------')  # Since this is module level, this is called at the end of module
    db.close()

def test_scott_data(db):  # Passed from above texture
    scott_data = db.get_data('Scott')
    assert scott_data['id'] == 1
    assert scott_data['name'] == 'Scott'
    assert scott_data['result'] == 'Pass'

def test_mark_data(db):  # Passed from above texture
    scott_data = db.get_data('Mark')
    assert scott_data['id'] == 2
    assert scott_data['name'] == 'Mark'
    assert scott_data['result'] == 'Fail'



 
 


pytest Tutorial 1 - Beginner

math_func.py

def add(x,y):
   return x+y

def product(x,y)
   return x*y

test_math_func.py

from math_func import add, subtract
import pytest
import sys

# @pytest.mark.skip(reason="Do not run number add test")
# @pytest.mark.skipif(sys.version_info < (3, 3), reason="Do not run number add test") 
@pytest.mark.number
def test_add():
   assert add(3,4) == 7
   print(f'Inside test_add: {assert add(3,4)}')

@pytest.mark.parameterize('arg1, arg2, result',
[
(7, 3, 10),
('Hello', ' World', 'Hello World')
(5.5, 4.5, 10)
]
)
def test_add_parameterize(arg1, arg2, result):
   assert add(arg1, arg2) == result

@pytest.mark.number
def test_product():
   assert add(3,4) == 12
 
@pytest.mark.strings
def test_add_strings():
   assert add(3,4) == 7
   result add('Hello', ' World')
   assert result == 'Hello World'
   assert 'Hello' in result
   assert type(result) is str 

@pytest.mark.strings
def test_product_strings():
   assert product('Hello', 3) == 'HelloHelloHello'

How to run
pytest test_match_func.py
pytest test_match_func.py -v

pytest test_match_func.py::test_add

(-k Expression)
pytest -v -k "add"  # runs tests with "add" string, run both test_add & test_add_strings
pytest -v -k "add or string" # runs all 3
pytest -v -k "add and string"  # runs test_add_strings alone

(-m, --markers)
pytest -v -m number # run those marked as number
pytest -v -m strings  # run those marked as strings


(-x, --exitfirst)
pytest -v -x  # If any first failure, it totally exits and does not execute following tests

pytest -v -x --tb=no  # It won't show error stack trace

pytest -v --maxfail=2

pytest -v -s  # -s option will output any print lines

pytest -v -q  # quite mode, will not print how many test passed

pytest Tutorial 2 -Fixtures (check the other blog)




Jan 10, 2020

Pandas Dataframes

# Info - lists all of our columns, also gives data types
df.info()

# Set options
pd.set_option('display.max_columns', 85)
pd.set_option('display.max_rows', 85)

# head
df.head(10)

# tail
df.tail(10)

# List all columns
df.columns

# Dataframe shape
print(df1.shape)

# Series - is a list / a column in a data frame
# A data frame is a container for multiple Series (columns)
people = {
              'first' : ['John', 'Mary', 'Linda'],
              'last' : ['Doe', 'Lee', 'Doe'],
              'email' : ['johndoe@gmail.com', 'marylee@gmail.com', 'lindadoe@gmail.com']
 }
df = pd.DataFrame(people)
type(df['email'])  # gives pandas.core.series.Series
df['email']  # gives list of emails
df.email  # gives same as above

pd.DataFrame.from_records (list of tuples)
list = [('John',30,'Bangalore'), ('Doe',25,'Chennai')] df = pd.DataFrame.from_records(list, columns=['Name', 'Age', 'City']) print(df.head(10))

df['email'] Vs df.email 
  • df['email'] is better to use over df.email
  • shape is an attribute of data frame, suppose you have a column name as shape in your DF
  • df['shape'] will give you result of shape column list 
  • df.shape will not work properly as you expect

# iloc Vs loc (Integer Location Vs Location)
# iloc (Search by position)
    df.iloc[0] # return a Series object for 0th index, access row with iloc
    df.iloc[ [0, 1] ] # returns first two rows of data, returns a Data Frame
    df.iloc[ [0, 1],  2] # returns email (2nd column) of first two rows
# loc  
(Search by label) (row_index, column_indexer)
    df.loc[0] # Same as iloc, returns Series object
    df.loc[ [ 0, 1] ] # Same as iloc, returns data frame object
    df.loc[ [ 0, 1 ], 'email' ]  # Instead of index, you can use column name
    df.loc[ [ 0, 1 ], ['email', 'last_name'] ] # Filter by multiple column names for first two rows
    df.loc[ 0, 'Hobby']  # Value of Hobby column for first row
    df.loc[ 0:5, 'Hobby:Employment']  # Values of Hobby to Employment columns for first six rows, slicing is inclusive   # see df.columns to get all the columns
    
df.loc[ df['email'] == 'abc@gmail.com',  : ]  # Rows of all with specific email (all columns)

# Get row by index
       Name  Age PinCode
0    Alex   10     500
1     Bob   12     600
2  Clarke   13     589

df.loc[2] # Gets object of Clarke by index
df1 = df.set_index('Name')  # Set Name as index, inplace=True is also there
print(df1)

             Age PinCode
Name
Alex     10     500
Bob      12     600
Clarke   13     589

df1.loc['Clarke']  # You can get the row by passing Name, since Name is the index


# Get unique values of Column (Yes/No)
df["gender"].value_counts()
Male 100
Female 200

# Unique
ll = ['John', 'Doe', 'Tom', 'Doe']
df1 = pd.Series(ll)
df2 = pd.unique(df1)
print(type(df2)) #<class 'numpy.ndarray'>
print(df2)         #['John' 'Doe' 'Tom']

# Filter
df1_link_keys = df1[df1.link_key != '']
df1_no_link_keys = df1[df1.link_key == '']

# Filter multiple/specific columns
df1 = df[['first', 'last', 'email']]  # Gives DF in return

# Filter isin
df1_subset = df1[df1.index.isin(100, 200)]

# Replace something with another
df1 = df1.replace('NULL', '')
df1 = df1.fillna('')

# Drop index
df1 = df1.drop(matched_index_list)
df1 = df1.drop(100)

# Column to list
df1_names_list = df1['name'].tolist() # column values to list
df1_index_list = df1.index.values.tolist() # index values to list

# Drop duplicates
df1.drop_duplicates(keep='last', inplace=True)  # Keep the last occurence

# Drop duplicates based on specific columns
df1.drop_duplicates(subset=['id', 'name', 'city', 'state'], keep='last', inplace=True)
df1.drop_duplicates(subset=['id', 'name', 'city', 'state'], keep='first', inplace=True)

df1 = df1.drop_duplicates(subset=['id', 'name', 'city', 'state'], keep='last')  # inplace is not true
df1 = df1.drop_duplicates(subset=['id', 'name', 'city', 'state'], keep='first') # inplace is not true

# Drop duplicates with state is empty 
# if only one record with state as empty, keep that
# If two same rows with one state is there & other state is empty, filter one with empty
s_maxes = df1.groupby(['id', 'name', 'city']).state.transform(max)
df1 = df1.loc[df1.state == s_maxes]

# Dataframe To Table
table_dtype = {
                'name': VARCHAR(),
                'city': VARCHAR(),
                'state': VARCHAR(),
                'pincode': INTEGER()
              }
df1.to_sql(con=self.db_conn, name=table_name, dtype=table_dtype,
           if_exists='replace', index=False)

# Dataframe To JSON
df.to_json('data_frame.json', orient='table')

# From Table to Dataframe
raw_data_sql = f""" select id, name, city, state
from event_raw_data
                """
df1 = pd.read_sql(raw_data_sql, con=self.db_conn, index_col="id")

# Loop Dataframe
for index, row in df1.iterrows():
print(index, row.name, row.city, row.state)

# How to subtract rows of one pandas data frame from another?
# You want to achieve df1 - df2
df1 = df1[~df1.index.isin(df2.index)]

# Read CSV
df1 = pd.read_csv('data/test.csv')
df1.shape  # (10, 5)
















SQLAlchemy commands

SQLAlchemy
##########
from sqlalchemy import func
from sqlalchemy.orm import Session
from sqlalchemy.ext.automap import automap_base
from sqlalchemy import create_engine

self.db_engine, self.db_conn = create_db_conn()

self.Base = automap_base()
self.Base.prepare(self.db_engine, reflect=True)
self.db_session = Session(self.db_engine)

self.table1_obj = self.Base.classes.table1
self.table2_obj = self.Base.classes.table2

DATABASE_URI = 'postgresql://postgres:password@localhost:5432/test'

def create_db_conn():
    db_engine = None
    db_conn = None
    try:
        sql_alchemy_engine = create_engine(f'{DATABASE_URI}')

    return db_engine, db_conn



COUNT
#####
rows = vobj.db_session.query(self.table1_obj).count()
print(rows)
rows = vobj.db_session.query(self.table2_obj).count()
print(rows)

activities = self.db_session.query(self.table1_obj).order_by(
            self.table1_obj.id).all()

UPDATE
######
ea = self.db_session.query(self.table2_obj).get(self.activity_id)
            ea.status = status
            self.db_session.commit()

GET
####
current_activity = self.db_session.query(self.table1_obj).filter_by(
    activity_id=self.venue_base_activity_id,
    source_id=row['source_id'],
    venue_id=row['vid']).first()
if current_activity:
    link_key = current_activity.link_key
    market_place_df.loc[index, 'link_key'] = link_key

GET Specific Columns
####################
result = db_session.query(table1_obj.source_id, table1_obj.vid, table1_obj.name,
                              table1_obj.city, table1_obj.state)\
            .filter(table1_obj.source_id.in_((1, 2)))\
            .group_by(table1_obj.source_id, table1_obj.vid, table1_obj.name,
                      table1_obj.city, table1_obj.state)\
            .order_by(table1_obj.source_id).all()

for row in result:
    try:
        source_id = row[0]
        name = clean_string(row[2])
        city = clean_string(row[3])
        state = clean_string(row[4])

GET Count
#########
ws_count = self.db_session.query(func.count(self.table1_obj.id))\
                            .filter_by(source_id=market_place2_source_id).scalar()

group_by
#######
HygroRecord.query.group_by(HygroRecord.sensor_uuid)
        .having(func.max(HygroRecord.timestamp)).all())

insert
######
self.db_session.add(
            self.table1(
                id=row_id,
                name=name
            )
        )
self.db_session.commit()

Insert - bulk insert
#####################
ll = []
ll.append(self.table2(
        id=self.activity_id,
        name=name,
        city=city,
        state=state
    )
)
self.db_session.bulk_save_objects(ll)
self.db_session.commit()


flush
########

    f = vobj.table1(
            id1=9,
            id2=18
        )
    vobj.db_session.add(f)
    vobj.db_session.flush() #without commit if you want to get insert id
    print(f.id)
    vobj.db_session.commit()