豪翔天下

Change My World by Program

0%

上周“被”上线了一个紧急项目,周五下班接到需求,周一开始思考解决方案,周三开发完成,周四走流程上线,也算是面向领导编程了。之前的项目里面由于是自运维,然后大多数又都赶时间,所以在处理定时任务上面基本都是自己在服务器上添加crontab,而不是让多个实例自己去处理定时任务的并发锁,之后Laravel 5.5开始自带并发锁,我们也打算尽快升级。但是这次项目是Python项目,无奈只能自己实现一下,以下这个方案实现起来非常简单且易于理解。这篇文章要解决的主要问题是:使用Redis锁处理并发问题,保证多进程仅有一个实例在运行,当运行中的实例down了后其它实例中的一个能顶上来,保证有且仅有一个实例在运行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import redis
r = redis.Redis(...)

last_heart = 0 # 记录上一次得到的锁心跳
free_lock_try = 6 # 锁无心跳的最大次数

while not r.setnx('mylock', 1):
now_heart = r.get('mylock')
print(f"没获取到锁,now_heart={now_heart},last_heart={last_heart},free_lock_try={free_lock_try}")
if now_heart == last_heart:
free_lock_try = free_lock_try - 1
if free_lock_try == 0: # 锁已经1分钟没有心跳了
old_heart = r.getset('mylock', 1) # 将lock重置为1,并返回set之前的心跳值
if old_heart < now_heart:
time.sleep(10)
continue
else:
break # 成功获取到锁,退出循环
else:
free_lock_try = 6 # 锁有心跳,重置free_lock_try值
last_heart = now_heart
time.sleep(10)

def producer_exit():
"""程序正常退出时候自动清理锁"""
r.delete('mylock')
import atexit
atexit.register(producer_exit)

# 业务代码
while True:
r.incr('mylock') # 让锁心跳加一
...

我们来看看这段程序都解决了并发锁中的哪些问题

  1. 高并发下,多个进程无法同时获取到锁。这里使用的是redis.setnx,如果锁已经存在,其他进程是无法重置锁并获取到锁的。另外当多个进程同时发现有锁已经没有心跳了,使用的是redis.getset将心跳重置为1,都能set成功,但是get出来的值多个进程是不一样的,只有真正获取到锁的进程返回的是之前进程的心跳,而其他进程获取到的都是1。
  2. 有锁进程正常退出,可以使用atexit注册进程退出函数删除锁,这里也可以不要,不过下次启动得等新的进程等待几次心跳
  3. 有锁进程意外退出,退出后心跳不再增加,超过free_lock_try次数后,其他进程会重新设置并获取锁
  4. 所有进程全都意外退出,这个问题不是锁来关心的,可以使用supervisor进行守护进程。

由于之前用pyspark写出来的spark程序在运行一段时间后会突然卡住,导致任务堆积,最终内存溢出。所以最近又决定直接用spark原生的scala语言来重写程序,scala大体上和java语言的语法是兼容的。

基本变量

  • val用于声明常量,var用于声明变量
阅读全文 »

Django自带了强大的名为admin的后台管理功能,app名称为django.contrib.admin,它同时依赖了django.contrib.auth认证系统和django.contrib.sessions系统,当然,即使不用admin,后面两者都建议加上,不用重复造轮子。

  • 为了使用它,我们需要先使用migrate功能去创建相应的数据库表,直接执行python manage.py makemigrations && python manage.py migrate即可。运行程序后,直接访问http://127.0.0.1:8000/admin/就能访问admin了(一般admin的路由都是定义好了的,在urls.py中有 url(r'^admin/', admin.site.urls),)

  • 我们需要先创建一个超级管理员python manage.py createsuperuser,按照提示输入用户名密码即可用来登录了

  • 如果要让字段非必填,需要在定义model字段的时候就加上blank=True参数

  • 修改超级管理员密码可以这样做:

    1
    2
    3
    4
    5
    # python manage.py shell
    from django.contrib.auth.models import User
    user =User.objects.get(username='admin')
    user.set_password('new_password')
    user.save()
阅读全文 »

PHP上最好的时间处理工具。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# 以下都是获得一个时间对象
Carbon::now(); // 获取当前时间,2018-01-01 11:11:11
Carbon::today(); // 获取今天的开始时间,2018-01-01 00:00:00
Carbon::tomorrow(); // 获取明天的开始时间,2018-01-02 00:00:00
Carbon::yesterday();// 获取昨天的开始时间,2017-12-31 00:00:00

# 解析时间,这个功能可以说是非常强大了,需要特别注意的是如果parse的字符串内部有带时区,那么解析后的对象也是自带时区的,可能跟当前时区是不一样的
Carbon::parse('2018-01-01');
Carbon::parse('2018-01-01 12:00:00');
Carbon::parse('today');
Carbon::parse('2 days ago');
Carbon::parse('+3 weeks');
Carbon::parse('last friday');
Carbon::parse('Fri May 31 2019 06:50:14 GMT+0000 (UTC)')->toDateTimeString(); // 这个会得到2019-05-31 06:50:14,而不是东8区的时间
Carbon::createFromFormat('Y-m-d H', '1975-05-21 22')->toDateTimeString(); // 1975-05-21 22:00:00
Carbon::createFromTimeStamp(1545800000);

# 获取时间字段
$time->timestamp; // 获取时间戳
$time->year;
$time->month;
$time->format('F'); // 获取月份全称,例如February
$time->day;
$time->hour;
$time->minute;
$time->second;
$time->micro;
$time->dayOfWeek; // 获取当前时间是一周的第几天
$time->dayOfYear; // 获取当前时间是一年的第几天
$time->weekOfMonth; // 获取当前时间是当月的第几周
$time->weekOfYear; // 获取当前时间是当年的第几周
$time->daysInMonth; // 获取当月有多少天
$time->startOfDay(); // 今天开始时间
$time->endOfDay(); // 今天结束时间
$time->startOfWeek(); // 这周开始时间
$time->endOfWeek(); // 这周结束时间
$time->startOfMonth(); // 这个月开始时间
$time->endOfMonth(); // 这个月结束时间

# 时间计算
$time->addDays(3);
$time->addWeeks(3);
$time->addHours(24);
$time->subHours(20);
$time->modify('-2 days');
$time->isWeekday(); // 是否是工作日
$time->isWeekend(); // 是否是周末
$time->isYesterday(); // 是否是昨天
$time->isTomorrow(); // 是否是明天

# 时间比较
$first->eq($second);
$first->ne($second);
$first->gt($second);
$first->gte($second);
$first->lt($second);
$first->lte($second);

# 获取指定格式输出
$time->toDateTimeString();
$time->subDays(5)->diffForHumans(); // 5天前
$time->format('m/y') // 指定输出格式: 12/2020

Troubleshooting

  • The timezone could not be found in the database: 通常是createFromFormat第一个参数格式没有设置

Django提供了非常方便的方法以供你自定义存储系统,只需要在项目的任意地方新建一个继承自django.core.files.sotrage.Storage的类即可。如果是使用七牛云,可以直接使用以下代码,当然首先要安装七牛的sdk: pip install qiniu:

阅读全文 »

最近公司有一个安全方面的业务,需要实时监控所有访客的情况。之前是定时去查询Elasticsearch的接口进行统计分析,但是这个时间间隔不好把握,并且Elasticsearch并不适合特别实时的查询操作。实时的分布式流计算引擎首推Spark,它与Hadoop等相比的优势在这里讲得比较清楚了。

  • **RDD(Resilient Distributed Dataset弹性分布式数据集)**:这是spark的主要数据概念。有多种来源,容错机制,并且能缓存、并行计算。RDD在整个计算流程中会经过不同方式的变换,这种变换关系就是一个有向无环图。
  • 需要注意的是,所有的方法在定义执行之前都是异步的,所以不能简单地在下面的方法外部添加try...catch...进行异常捕获,最好是在传入的函数里面进行异常的捕获(如果是lambda,请确认lambda不会报错,否则如果lambda报错整个程序都会报错并终止允许)
  • Spark应用程序可以使用大多数主流语言编写,这里使用的是python,只pip install pyspark即可
  • Stage(调度阶段): 每个Job会根据RDD大小切分城多个Stage,每个Stage包含一个TaskSet
  • TaskSet(任务集): 一组关联的Task集合,不过是没有依赖的
  • Task(任务): RDD中的一个分区对应一个Task。
  • Narrow Dependency(窄依赖): 比较简单的一对一依赖和多对一依赖(如union)
  • Shuffle Dependency(宽依赖): 父RDD的分区被多个子RDD分区所使用,这时父RDD的数据会被再次分割发送给子RDD
  • Spark 内存分配: 分为这三块:
    • execution: 执行内存,基本的算子都是在这里面执行的,这块内存满了就写入磁盘。
    • storage: 用于存储broadcast, cache, persist
    • other: 程序预留给自己的内存,这个可以不用考虑
  • Duration
    • batchDuration: 批次时间
    • windowDuration: 窗口时间,要统计多长时间内的数据,必须是batchDuration的整数倍
    • slideDuration: 滑动时间,窗口多长时间滑动一次,必须是batchDuration的整数倍,一般是跟batchDuration时间相同
阅读全文 »

N+1问题

考虑这样一个常见应用场景,前端页面上需要展示一个文章列表,其中包括了文章的标题,并且会同时显示每篇文章的作者名。那么我们可能会按下面几种方案来设计我们的API。

方案一: 对于Restful,设计下面两个接口,客户端总共需要请求1+N次接口,查询数据库1+N次。

1
2
/articles	# 文章列表接口
/articles/{article_id}/author # 获取文章作者信息接口
阅读全文 »

本讲将会介绍GraphQL的基础语法,毕竟业务永远都离不开增删改查。

除“读“操作可以直接与数据库model相映射以外,跟”写“有关的操作的后端定义依然是需要自己去实现业务相关的映射逻辑的。当然,无论读写都是需要定义相应的Model的,可以在Web上面查看已经定义的Mutation:

阅读全文 »

前面几讲讲了理论层面,大家应该对GraphQL不再陌生了。这里简单讲述一下本教程demo的搭建方式。

代码目录结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
.
├── Dockerfile
├── LICENSE
├── Pipfile
├── Pipfile.lock
├── README.md
├── database.sql # 初始数据库
├── db.py
├── managers # 封装数据库操作
│   ├── __init__.py
│   ├── article.py
│   ├── author.py
│   ├── comment.py
│   ├── ordinary_writer.py
│   └── professional_writer.py
├── models # 数据库映射对象
│   ├── __init__.py
│   ├── article.py
│   ├── author.py
│   ├── comment.py
│   ├── ordinary_writer.py
│   └── professional_writer.py
├── mutations # 操作变更定义
│   ├── __init__.py
│   ├── article.py
│   ├── author.py
│   └── comment.py
├── run.py # 主程序
├── schemas # 模型数据结构定义
│   ├── __init__.py
│   ├── article.py
│   ├── author.py
│   ├── comment.py
│   ├── interfaces.py # 接口schema
│   ├── ordinary_writer.py
│   └── professional_writer.py
├── settings.py # 数据库连接配置
└── web_template.py #
阅读全文 »

对于简单的GraphQL查询,其实很简单,任何一个会CRUD的开发者都知道,使用简单的if … else … 就能实现逐字段的遍历查询。这也就是GraphQL的核心算法,只是针对每个字段,GraphQL会提供一个resolver去实现特殊的字段获取方式,详情可以看demo代码,例如: https://github.com/haoflynet/graphql-tutorial/blob/master/schemas/article.py。

假设有这样一个查询:

1
2
3
4
5
6
7
8
{
articles {
title // SEELCT `title` FROM `articles`;
comments {
content // for id in article_ids: SELECT `content` FROM `comments` WHERE `article_id`=id;
}
}
}
阅读全文 »