作者 | 周萝卜
来源 | 萝卜大杂烩
今天分享几段工作生活中常用的代码,都是最为基础的功能和操作,而且大多还都是出现频率比较高的,很多都是可以拿来直接使用或者简单修改就可以放到自己的项目当中
importdatetime
defget_nday_list(n):
before_n_days=[]
foriinrange(1,n+1)[::-1]:
before_n_days.append(str(datetime.date.today()-datetime.timedelta(days=i)))
returnbefore_n_days
a=get_nday_list(30)
print(a)
['2021-12-23', '2021-12-24', '2021-12-25', '2021-12-26', '2021-12-27', '2021-12-28', '2021-12-29', '2021-12-30', '2021-12-31', '2022-01-01', '2022-01-02', '2022-01-03', '2022-01-04', '2022-01-05', '2022-01-06', '2022-01-07', '2022-01-08', '2022-01-09' '2022-01-10', '2022-01-11', '2022-01-12', '2022-01-13', '2022-01-14', '2022-01-15', '2022-01-16', '2022-01-17', '2022-01-18', '2022-01-19', '2022-01-20', '2022-01-21']
importdatetime
defcreate_assist_date(datestart=None,dateend=None):
#创建日期辅助表
ifdatestartisNone:
datestart='2016-01-01'
ifdateendisNone:
dateend=datetime.datetime.now().strftime('%Y-%m-%d')
#转为日期格式
datestart=datetime.datetime.strptime(datestart,'%Y-%m-%d')
dateend=datetime.datetime.strptime(dateend,'%Y-%m-%d')
date_list=[]
date_list.append(datestart.strftime('%Y-%m-%d'))
whiledatestart<dateend:
#日期叠加一天
datestart+=datetime.timedelta(days=+1)
#日期转字符串存入列表
date_list.append(datestart.strftime('%Y-%m-%d'))
returndate_list
d_list=create_assist_date(datestart='2021-12-27',dateend='2021-12-30')
d_list
['2021-12-27', '2021-12-28', '2021-12-29', '2021-12-30']
defsave_data(data,date):
ifnotos.path.exists(r'2021_data_%s.csv'%date):
withopen("2021_data_%s.csv"%date,"a+",encoding='utf-8')asf:
f.write("标题,热度,时间,url\n")
foriindata:
title=i["title"]
extra=i["extra"]
time=i['time']
url=i["url"]
row='{},{},{},{}'.format(title,extra,time,url)
f.write(row)
f.write('\n')
else:
withopen("2021_data_%s.csv"%date,"a+",encoding='utf-8')asf:
foriindata:
title=i["title"]
extra=i["extra"]
time=i['time']
url=i["url"]
row='{},{},{},{}'.format(title,extra,time,url)
f.write(row)
f.write('\n')
defpie_rosetype(data)->Pie:
background_color_js=(
"newecharts.graphic.LinearGradient(0,0,0,1,"
"[{offset:0,color:'#c86589'},{offset:1,color:'#06a7ff'}],false)"
)
c=(
Pie(init_opts=opts.InitOpts(bg_color=JsCode(background_color_js)))
.add(
"",
data,
radius=["30%","75%"],
center=["45%","50%"],
rosetype="radius",
label_opts=opts.LabelOpts(formatter="{b}:{c}"),
)
.set_global_opts(title_opts=opts.TitleOpts(title=""),
)
)
returnc
importrequests
headers={
'user-agent':'Mozilla/5.0(WindowsNT10.0;Win64;x64)AppleWebKit/537.36(KHTML,likeGecko)Chrome/96.0.4664.110Safari/537.36',
'cookie':'some_cookie'
}
response=requests.request("GET",url,headers=headers)
importrequests
payload={}
files=[]
headers={
'user-agent':'Mozilla/5.0(WindowsNT10.0;Win64;x64)AppleWebKit/537.36(KHTML,likeGecko)Chrome/96.0.4664.110Safari/537.36',
'cookie':'some_cookie'
}
response=requests.request("POST",url,headers=headers,data=payload,files=files)
defget_data(mydate):
date_list=create_assist_date(mydate)
url="https://test.test"
files=[]
headers={
'user-agent':'Mozilla/5.0(WindowsNT10.0;Win64;x64)AppleWebKit/537.36(KHTML,likeGecko)Chrome/96.0.4664.110Safari/537.36',
'cookie':''
}
fordindate_list:
payload={'p':'10',
'day':d,
'nodeid':'1',
't':'itemsbydate',
'c':'node'}
foriinrange(1,100):
payload['p']=str(i)
print("getdataof%sinpage%s"%(d,str(i)))
response=requests.request("POST",url,headers=headers,data=payload,files=files)
items=response.json()['data']['items']
ifitems:
save_data(items,d)
else:
break
importredis
defredis_conn_pool():
pool=redis.ConnectionPool(host='localhost',port=6379,decode_responses=True)
rd=redis.Redis(connection_pool=pool)
returnrd
fromredis_connimportredis_conn_pool
rd=redis_conn_pool()
rd.set('test_data','mytest')
frompymongoimportMongoClient
conn=MongoClient("mongodb://%s:%s@ipaddress:49974/mydb"%('username','password'))
db=conn.mydb
mongo_collection=db.mydata
res=requests.get(url,params=query).json()
commentList=res['data']['commentList']
mongo_collection.insert_many(commentList)
importMySQLdb
#打开数据库连接
db=MySQLdb.connect("localhost","testuser","test123","TESTDB",charset='utf8')
#使用cursor()方法获取操作游标
cursor=db.cursor()
#使用execute方法执行SQL语句
cursor.execute("SELECTVERSION()")
#使用fetchone()方法获取一条数据
data=cursor.fetchone()
print"Databaseversion:%s"%data
#关闭数据库连接
db.close()
Database version : 5.0.45
importpandasaspd
importos
df_list=[]
foriinos.listdir():
if"csv"ini:
day=i.split('.')[0].split('_')[-1]
df=pd.read_csv(i)
df['day']=day
df_list.append(df)
df=pd.concat(df_list,axis=0)
df.to_csv("total.txt",index=0)
importthreading
importtime
exitFlag=0
classmyThread(threading.Thread):
def__init__(self,threadID,name,delay):
threading.Thread.__init__(self)
self.threadID=threadID
self.name=name
self.delay=delay
defrun(self):
print("开始线程:"+self.name)
print_time(self.name,self.delay,5)
print("退出线程:"+self.name)
defprint_time(threadName,delay,counter):
whilecounter:
ifexitFlag:
threadName.exit()
time.sleep(delay)
print("%s:%s"%(threadName,time.ctime(time.time())))
counter-=1
#创建新线程
thread1=myThread(1,"Thread-1",1)
thread2=myThread(2,"Thread-2",2)
#开启新线程
thread1.start()
thread2.start()
thread1.join()
thread2.join()
print("退出主线程")
importasyncio
importaiohttp
importaiofiles
asyncdefget_html(session,url):
try:
asyncwithsession.get(url=url,timeout=8)asresp:
ifnotresp.status//100==2:
print(resp.status)
print("爬取",url,"出现错误")
else:
resp.encoding='utf-8'
text=awaitresp.text()
returntext
exceptExceptionase:
print("出现错误",e)
awaitget_html(session,url)
asyncdefdownload(title_list,content_list):
asyncwithaiofiles.open('{}.txt'.format(title_list[0]),'a',
encoding='utf-8')asf:
awaitf.write('{}'.format(str(content_list)))
分享
点收藏
点点赞
点在看
文章转发自AI科技大本营微信公众号,版权归其所有。文章内容不代表本站立场和任何投资暗示。
Copyright © 2021.Company 元宇宙YITB.COM All rights reserved.元宇宙YITB.COM