本文是 从 Flask 到 Gin 系列的第 4 篇。


本篇讲解在 Flask 和 Gin 中使用 MySQL 数据库的相关问题。在 Flask 中,我使用的是 Python 世界中最强大的 ORM 库: SQLAlchemy。在 Gin 的实现中,我选择了 gorm

Golang 中的 ORM 也是百花齐放的状态,如果你不喜欢 gorm ,也可以参考 awsome-go 来选择。

SQLAlchemy 的表定义

我博客中写过几篇关于 SQLAlchemy 的文章。在 Flask 的项目中,我使用的是 FlaskSQLAlchemy 这个插件,它对 SQLAlchemy 进行了简单的封装。

在我的项目中,连接了两个数据库:

  1. data1 数据库包含 active
  2. data2 数据库包含 register

下方的代码展示了这两个表在 SQLAlchemy 中的定义:

# -*- coding: utf-8 -*-
"""
app.models.audible
~~~~~~~~~~~~~~~~~~~

data1.active
data2.register
"""

# db 就是 flask_sqlalchemy.SQLAlchemy 的实例
from mjp.app import db


class LogActive(db.Model):
    """ Log 中的活跃数据
    """
    __tablename__ = 'active'
    __bind_key__ = 'data1'

    # 就是 regional
    gid = db.Column(db.INT, primary_key=True, index=True)

    # 代表日期
    date = db.Column(db.INT, primary_key=True, index=True)

    # 渠道 ID
    channel = db.Column(db.INT, primary_key=True, index=True)

    # 数量
    num = db.Column(db.INT, nullable=True)


class LogRegister(db.Model):
    """ Log 中的注册数据
    """
    __tablename__ = 'register'
    __bind_key__ = 'data2'

    # 就是 regional
    gid = db.Column(db.INT, primary_key=True, index=True)

    # 代表日期
    date = db.Column(db.INT, primary_key=True, index=True)

    # 渠道 ID
    channel = db.Column(db.INT, primary_key=True, index=True)

    # 数量
    num = db.Column(db.INT, nullable=True)

在 SQLAlchemy 中,可以很灵活的使用参数语法来制定表字段的定义。也可以使用 __tablename__ 来指定表名,使用 __bind_key__ 来指定这个表对应的数据库。

上面两个表的结构是一致的,它们的内容如下所示:

data2.register

对应的多数据库配置,默认的数据库指向 data1。这意味着如果不在 class 定义中指定 __bind_key__,则会认为这个表是位于 data1

{
    "SECRET_KEY": "YutjgVSPDERGyPayXrXbwsuF_SZWiVmUw3mD4YYD_kY=",
    "SQLALCHEMY_DATABASE_URI": "mysql+pymysql://zrong:123456@127.0.0.1/data1",
    "SQLALCHEMY_BINDS": {
        "data1": "mysql+pymysql://zrong:123456@127.0.0.1/data1",
        "data2": "mysql+pymysql://zrong:123456@127.0.0.1/data2"
    }
}

SQLAlchemy 中的多数据库查询

由于 data1.activedata2.register 这两个表的结构是一样的,我将其封装成同一个方法: _response_register_or_active,通过传入不同的表定义来实现查询。下面代码中的 responseto 方法的定义已经在 从 Flask 到 Gin —— 处理 JSON 一文中介绍过了。

def _parse_date(datestr):
    if datestr is None:
        return None
    return int(dt.strftime('%Y%m%d'))


def _response_register_or_active(DataTable):
    """ 提供一个数据表作为参数,数据表的字段名称必须相同
    返回一个响应对象
    :param DataTable: 数据表 Class

    :arg from_date: 起始日期
    :arg to_date: 终止日期
    """
    # get_request_values 是一个获取查询的封装,等同于在 request.args 中查询,很容易实现,在此就不列出定义了
    gids, from_date, to_date = get_request_values('gids', 'from_date', 'to_date', request_key='args')
    from_date = _parse_date(from_date)
    to_date = _parse_date(to_date)
    if from_date is None or to_date is None or gids is None:
        return responseto('请提供 from_date/to_date/gids!', code=401)
    try:
        gids = json.loads(gids)
        if not isinstance(gids, list) or len(gids) == 0:
            return responseto('gids 必须是一个列表!', code=401)
    except Exception as e:
        return responseto('gids 解析错误!', code=401)

    gids = [item.r for item in rall]
    results = DataTable.query.\
        filter(DataTable.gid.in_(gids)).\
        filter(and_(DataTable.date >= from_date, DataTable.date <= to_date)).\
        with_entities(DataTable.gid, DataTable.date, func.sum(DataTable.num).label('num')).\
        group_by(DataTable.gid, DataTable.date).\
        all()
    stat = [{
                'gid': item.gid,
                'num': int(item.num),
                'date': item.date
            } for item in results]
    return responseto(stat=stat, gids=gids)

定义两个不同的路由,直接调用上面的 _response_register_or_active 就可以实现根据提供的日期分组返回 MySQL 中的数据:

@audible.route('/register/', methods=['GET'])
def register():
    """ 获取注册数据
    """
    return _response_register_or_active(LogRegister)


@audible.route('/active/', methods=['GET'])
def active():
    """ 获取活跃数据
    """
    return _response_register_or_active(LogActive)

在对 LogRegister 和 LogActive 这两个表进行定义的时候,我们已经通过 __bind_key__ 指定了数据库,执行查询的时候,SQLAlchemy 会自行切换数据库进行查询。

让我们来测试一下:

curl --request GET \
  --url 'http://127.0.0.1:5001/audible/register/?from_date=20180801&to_date=20180802&gids=%5B1%2C53%5D'

结果为:

{
  "code": 200,
  "error": false,
  "gids": [
    1,
    53
  ],
  "stat": [
    {
      "date": 20180801,
      "gid": 1,
      "num": 819
    },
    {
      "date": 20180802,
      "gid": 1,
      "num": 840
    },
    {
      "date": 20180801,
      "gid": 53,
      "num": 680
    },
    {
      "date": 20180802,
      "gid": 53,
      "num": 624
    }
  ]
}

gorm 的表定义

activeregister 表的定义如下:

package models

import (
	"github.com/gin-gonic/gin"
)

// ActiveModel is a table for active users
type ActiveModel struct {
	Gid     int `gorm:"PRIMARY_KEY,AUTO_INCREMENT"`
	Date    int `gorm:"PRIMARY_KEY,INDEX"`
	Channel int `gorm:"PRIMARY_KEY,INDEX"`
	Num     int `gorm:"NOT NULL"`
}

// TableName for ActiveModel
func (ActiveModel) TableName() string {
	return "active"
}

// RegisterModel is a table for registered users
type RegisterModel struct {
	Gid     int `gorm:"PRIMARY_KEY,AUTO_INCREMENT"`
	Date    int `gorm:"PRIMARY_KEY,INDEX"`
	Channel int `gorm:"PRIMARY_KEY,INDEX"`
	Num     int `gorm:"NOT NULL"`
}

// TableName for RegisterModel
func (RegisterModel) TableName() string {
	return "register"
}

需要注意的是,在 gorm 中需要使用 Struct Tags 来定义字段属性,使用 TableName 方法来定义表的名称。

文档 gorm models 定义 中解释得不够清晰,几条常用的我总结一下:

  • 使用 column:列名 这样的语法来精确指定列名。
  • 使用 type:SQL 类型 这样的语法来精确指定列字段类型。
  • Struct Tags 中具体的定义之间可以使用分号或者逗号隔开。
  • Struct Tags 中不区分大小写。

下面是一个混合的范例,很清晰,不解释了:

// RegionalModel is a table for regional
type RegionalModel struct {
	R          int        `gorm:"type:smallint;PRIMARY_KEY,AUTO_INCREMENT"`
	Name       string     `gorm:"type:varchar(100);NOT NULL,INDEX"`
	Value      string     `gorm:"type:text;INDEX"`
	Status     int        `gorm:"type:smallint;NOT NULL"`
	CreateTime *time.Time `gorm:"type:smallint;column:createtime"`
	UpdateTime *time.Time `gorm:"type:smallint;column:updatetime"`
}

上面列出的 ActiveModelRegisterModel 是具体的数据表定义,要将数据库中查询到的数据进行 JSON 序列化,还需要一些代码配合。请参考:从 Flask 到 Gin —— 处理 JSON -> 序列化

gorm 中的多数据库初始化

database.go 中初始化多个数据库,以供使用:

package models

import (
	"fmt"
	"mjp/util"

	"github.com/jinzhu/gorm"
	_ "github.com/jinzhu/gorm/dialects/mysql" // mysql drive
)

// DefaultDB is a global object for MySQL call.
var DefaultDB *gorm.DB

// Data1DB is a global object for MySQL bind with name "data1"
var Data1DB *gorm.DB

// Data2DB is a global object for MySQL bind with name "data2"
var Data2DB *gorm.DB

func init() {
	conf := util.ConfigInstance()
	DefaultDB = initDB(conf.Databases.URI)
	Data1DB = initDB(conf.DatabaseBind("data1"))
	Data2DB = initDB(conf.DatabaseBind("data2"))
	// defer DefaultDB.Close()
}

func initDB(uri string) *gorm.DB {
	db, err := gorm.Open("mysql", uri)
	if err != nil {
		errmsg := fmt.Errorf("Connect %s error: %s", uri, err)
		panic(errmsg)
	}
	db.DB().SetMaxIdleConns(10)
	db.LogMode(true)
	db.SingularTable(true)
	return db
}

要了解 util.ConfigInstance 的具体实现,请参考:Flask 到 Gin —— 读取配置文件

gorm 中的多数据库查询

和 SQLAlchemy 类似,由于 data1.activedata2.register 这两个表的结构是一样的,我将其封装成同一个方法: registerOrActive,通过传入不同的表定义来实现查询。下面代码中的 re2q.Responseto 方法的定义已经在 从 Flask 到 Gin —— 处理 JSON 一文中介绍过了。

package routers

import (
	"fmt"
	"mjp/models"
	"mjp/re2q"
	"mjp/util"
	"strconv"
	"strings"

	"github.com/gin-gonic/gin"
	"github.com/jinzhu/gorm"
)

// 返回一个处理过的 gorm 查询,以便进一步处理
func registerOrActive(c *gin.Context, DataTable *gorm.DB) (defaultDBQry *gorm.DB) {
	gids, gok := c.GetQuery("gids")
	var gidsInt []int
	if gok {
		gidsString := strings.Split(gids, ",")
		gidsInt = make([]int, len(gidsString))
		for i, value := range gidsString {
			gidInt, gidErr := strconv.Atoi(value)
			if gidErr != nil {
				re2q.Responseto(c, fmt.Sprintf("Convert gid %s got a error!", value), 401)
				return
			}
			gidsInt[i] = gidInt
		}
	}

	fromDate, fromDateErr := strconv.Atoi(c.Query("from_date"))
	toDate, toDateErr := strconv.Atoi(c.Query("to_date"))
	if fromDateErr != nil || toDateErr != nil {
		re2q.Responseto(c, "Give from_date and to_date please!", 401)
		return
	}
	// gids 必须存在
	if gidsInt == nil {
		re2q.Responseto(c, "gids please!", 401)
		return
	}

	defaultDBQry = DataTable.Where("date BETWEEN ? AND ?", fromDate, toDate).Where("gid IN (?)", gidsInt)
	return
}

和 Flask 中类似,定义两个不同的路由,直接调用上面的 registerOrActive 就可以实现根据提供的日期分组返回 MySQL 中的数据:

package routers

import (
	"fmt"
	"mjp/models"
	"mjp/re2q"
	"mjp/util"
	"strconv"
	"strings"

	"github.com/gin-gonic/gin"
	"github.com/jinzhu/gorm"
)

// InitAudible register the audible api
func InitAudible(router *gin.RouterGroup) {
	router.GET("/register", AudibleRegister)
	router.GET("/active", AudibleActive)
}

// AudibleActive return active users list
func AudibleActive(c *gin.Context) {
    // 使用 Data1 数据库
	defaultDBQry := registerOrActive(c, models.Data1DB)
    // 查找活跃数据
    actives := []models.ActiveModel{}
    findError := defaultDBQry.Find(&actives).Error
    if findError != nil {
        re2q.Responseto(c, findError.Error(), 503)
        return
    }
    activesSerializer := models.ActivesSerializer{c, actives}
    re2q.ResponsetoWithData(c, gin.H{
        "stat":      activesSerializer.Response(),
        "code":      200,
    })
}

// AudibleRegister return registred user list
func AudibleRegister(c *gin.Context) {
    // 使用 Data2 数据库
	defaultDBQry := registerOrActive(c, models.Data2DB)
    // 查找注册数据
    registers := []models.RegisterModel{}
    findError := defaultDBQry.Find(&registers).Error
    if findError != nil {
        re2q.Responseto(c, findError.Error(), 503)
        return
    }
    registersSerializer := models.RegistersSerializer{c, registers}
    re2q.ResponsetoWithData(c, gin.H{
        "stat":      registersSerializer.Response(),
        "code":      200,
    })
}

这里的测试效果和 Python 版本完全相同。

参考


阅读系列所有文章:从 Flask 到 Gin

全文完