Compare commits
25 Commits
Author | SHA1 | Date |
---|---|---|
namhq1989 | 9b0fbe90c5 | |
namhq1989 | ba5c955163 | |
namhq1989 | 61dc703927 | |
Nam Huynh | 1e281e2174 | |
namhq1989 | 689cf7e129 | |
namhq1989 | dc50f7e893 | |
sinhluu | 48899c3733 | |
Sinh | 43c6bb8d9e | |
sinhluu | 4f10c2063e | |
Sinh | 8a1130909d | |
trunglt251292 | bfd60c8faa | |
trunglt251292 | 137e26646d | |
trunglt251292 | b4bc00336f | |
nguyenphamquangtue | 44182de030 | |
Tue | 39267af9ad | |
nguyenphamquangtue | 44034adbcb | |
Tue | 2d04035236 | |
nguyenphamquangtue | e84d973c29 | |
Tue | 5117de884d | |
nguyenphamquangtue | 369a3a003d | |
Tue | 244a7b9e2a | |
namhq1989 | d6ac97d15b | |
namhq1989 | 8da21b1024 | |
namhq1989 | 8abf9ac5ed | |
namhq1989 | 2e42b0a20a |
|
@ -0,0 +1,128 @@
|
||||||
|
package postgresql
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"reflect"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"github.com/volatiletech/sqlboiler/v4/boil"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
indexSign = "$%d"
|
||||||
|
decodeTag = "boil"
|
||||||
|
)
|
||||||
|
|
||||||
|
type bulkInsertItem map[string]interface{}
|
||||||
|
|
||||||
|
type BulkInsertPayload struct {
|
||||||
|
TableName string
|
||||||
|
Data interface{}
|
||||||
|
Columns []string
|
||||||
|
}
|
||||||
|
|
||||||
|
func BulkInsert(ctx context.Context, db boil.ContextExecutor, payload BulkInsertPayload) error {
|
||||||
|
// convert data to map
|
||||||
|
insertRows, err := toMap(payload.Data)
|
||||||
|
if err != nil {
|
||||||
|
fmt.Printf("[postgresql] error when convert data to map: %s", err.Error())
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
numOfColumns = len(payload.Columns)
|
||||||
|
incSigns = make([]string, 0)
|
||||||
|
insertValues = make([]interface{}, 0)
|
||||||
|
listOfSigns = make([]string, numOfColumns)
|
||||||
|
listOfColumns = make([]string, numOfColumns)
|
||||||
|
)
|
||||||
|
|
||||||
|
// prepare array of dollar signs
|
||||||
|
for i := range listOfSigns {
|
||||||
|
listOfSigns[i] = indexSign
|
||||||
|
listOfColumns[i] = payload.Columns[i]
|
||||||
|
}
|
||||||
|
|
||||||
|
// prepare sign for every column
|
||||||
|
signs := strings.Join(listOfSigns, ",")
|
||||||
|
insertColumns := strings.Join(listOfColumns, ",")
|
||||||
|
|
||||||
|
for index, r := range insertRows {
|
||||||
|
currentIncSigns := getIncSignValues(index, numOfColumns)
|
||||||
|
incSigns = append(incSigns, fmt.Sprintf("("+signs+")",
|
||||||
|
currentIncSigns...,
|
||||||
|
))
|
||||||
|
|
||||||
|
currentInsertValues := getInsertValues(r, payload.Columns, numOfColumns)
|
||||||
|
insertValues = append(insertValues, currentInsertValues...)
|
||||||
|
}
|
||||||
|
|
||||||
|
// exec
|
||||||
|
stm := getSQLStatement(payload.TableName, insertColumns, incSigns)
|
||||||
|
_, err = db.ExecContext(ctx, stm, insertValues...)
|
||||||
|
if err != nil {
|
||||||
|
fmt.Printf("[postgresql] error when insert to db: %s", err.Error())
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func getSQLStatement(tableName, insertColumns string, incSigns []string) string {
|
||||||
|
return fmt.Sprintf(`
|
||||||
|
insert into %s (
|
||||||
|
%s
|
||||||
|
)
|
||||||
|
values %s
|
||||||
|
`, tableName, insertColumns, strings.Join(incSigns, ","))
|
||||||
|
}
|
||||||
|
|
||||||
|
func getIncSignValues(index, numOfColumns int) (result []interface{}) {
|
||||||
|
for i := 1; i <= numOfColumns; i++ {
|
||||||
|
result = append(result, index*numOfColumns+i)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func getInsertValues(row bulkInsertItem, columns []string, numOfColumns int) (result []interface{}) {
|
||||||
|
for i := 0; i < numOfColumns; i++ {
|
||||||
|
result = append(result, row[columns[i]])
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func toMap(input interface{}) ([]bulkInsertItem, error) {
|
||||||
|
result := make([]bulkInsertItem, 0)
|
||||||
|
|
||||||
|
v := reflect.ValueOf(input)
|
||||||
|
if v.Kind() == reflect.Ptr {
|
||||||
|
v = v.Elem()
|
||||||
|
}
|
||||||
|
|
||||||
|
// only accept slices
|
||||||
|
if v.Kind() != reflect.Slice {
|
||||||
|
err := fmt.Errorf("toMap only accepts slices; got %T", v)
|
||||||
|
fmt.Printf("[postgresql] invalid type toMap: %s", err.Error())
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// loop and assign data to result
|
||||||
|
for x := 0; x < v.Len(); x++ {
|
||||||
|
item := bulkInsertItem{}
|
||||||
|
typ := v.Index(x).Type()
|
||||||
|
|
||||||
|
// loop each field
|
||||||
|
for i := 0; i < v.Index(x).NumField(); i++ {
|
||||||
|
fi := typ.Field(i)
|
||||||
|
t := fi.Tag.Get(decodeTag)
|
||||||
|
if t != "" {
|
||||||
|
// set key of map to value in struct field
|
||||||
|
item[t] = v.Index(x).Field(i).Interface()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
result = append(result, item)
|
||||||
|
}
|
||||||
|
|
||||||
|
return result, nil
|
||||||
|
}
|
|
@ -0,0 +1,48 @@
|
||||||
|
package postgresql
|
||||||
|
|
||||||
|
import (
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/shopspring/decimal"
|
||||||
|
"github.com/volatiletech/null/v8"
|
||||||
|
)
|
||||||
|
|
||||||
|
func GetString(val null.String) string {
|
||||||
|
if !val.Valid {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
return val.String
|
||||||
|
}
|
||||||
|
|
||||||
|
func GetInt(val null.Int) int {
|
||||||
|
if !val.Valid {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
return val.Int
|
||||||
|
}
|
||||||
|
|
||||||
|
func GetBool(val null.Bool) bool {
|
||||||
|
if !val.Valid {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
return val.Bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func GetTime(val null.Time) (res time.Time) {
|
||||||
|
if !val.Valid {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
return val.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
func GetJSON(val null.JSON) (res []byte) {
|
||||||
|
if !val.Valid {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
return val.JSON
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetFloat64 ...
|
||||||
|
func GetFloat64(val decimal.Decimal) float64 {
|
||||||
|
return val.InexactFloat64()
|
||||||
|
}
|
53
go.mod
53
go.mod
|
@ -1,11 +1,52 @@
|
||||||
module github.com/Selly-Modules/postgresql
|
module git.selly.red/Selly-Modules/postgresql
|
||||||
|
|
||||||
go 1.16
|
go 1.19
|
||||||
|
|
||||||
require (
|
require (
|
||||||
github.com/Masterminds/squirrel v1.5.0
|
github.com/Masterminds/squirrel v1.5.0
|
||||||
github.com/jmoiron/sqlx v1.3.4
|
github.com/golang-migrate/migrate/v4 v4.15.2
|
||||||
github.com/lib/pq v1.10.2
|
github.com/jackc/pgx/v4 v4.17.0
|
||||||
github.com/logrusorgru/aurora v2.0.3+incompatible
|
github.com/shopspring/decimal v1.3.1
|
||||||
go.elastic.co/apm/module/apmsql v1.14.0
|
github.com/volatiletech/null/v8 v8.1.2
|
||||||
|
github.com/volatiletech/sqlboiler/v4 v4.12.0
|
||||||
|
go.elastic.co/apm/module/apmsql/v2 v2.2.0
|
||||||
|
golang.org/x/text v0.6.0
|
||||||
|
)
|
||||||
|
|
||||||
|
require (
|
||||||
|
github.com/armon/go-radix v1.0.0 // indirect
|
||||||
|
github.com/elastic/go-licenser v0.4.1 // indirect
|
||||||
|
github.com/elastic/go-sysinfo v1.9.0 // indirect
|
||||||
|
github.com/elastic/go-windows v1.0.1 // indirect
|
||||||
|
github.com/friendsofgo/errors v0.9.2 // indirect
|
||||||
|
github.com/gofrs/uuid v4.0.0+incompatible // indirect
|
||||||
|
github.com/hashicorp/errwrap v1.1.0 // indirect
|
||||||
|
github.com/hashicorp/go-multierror v1.1.1 // indirect
|
||||||
|
github.com/jackc/chunkreader/v2 v2.0.1 // indirect
|
||||||
|
github.com/jackc/pgconn v1.13.0 // indirect
|
||||||
|
github.com/jackc/pgio v1.0.0 // indirect
|
||||||
|
github.com/jackc/pgpassfile v1.0.0 // indirect
|
||||||
|
github.com/jackc/pgproto3/v2 v2.3.1 // indirect
|
||||||
|
github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b // indirect
|
||||||
|
github.com/jackc/pgtype v1.12.0 // indirect
|
||||||
|
github.com/jcchavezs/porto v0.4.0 // indirect
|
||||||
|
github.com/joeshaw/multierror v0.0.0-20140124173710-69b34d4ec901 // indirect
|
||||||
|
github.com/lann/builder v0.0.0-20180802200727-47ae307949d0 // indirect
|
||||||
|
github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0 // indirect
|
||||||
|
github.com/lib/pq v1.10.2 // indirect
|
||||||
|
github.com/pkg/errors v0.9.1 // indirect
|
||||||
|
github.com/prometheus/procfs v0.9.0 // indirect
|
||||||
|
github.com/santhosh-tekuri/jsonschema v1.2.4 // indirect
|
||||||
|
github.com/volatiletech/inflect v0.0.1 // indirect
|
||||||
|
github.com/volatiletech/randomize v0.0.1 // indirect
|
||||||
|
github.com/volatiletech/strmangle v0.0.4 // indirect
|
||||||
|
go.elastic.co/apm/v2 v2.2.0 // indirect
|
||||||
|
go.elastic.co/fastjson v1.1.0 // indirect
|
||||||
|
go.uber.org/atomic v1.10.0 // indirect
|
||||||
|
golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa // indirect
|
||||||
|
golang.org/x/mod v0.7.0 // indirect
|
||||||
|
golang.org/x/sys v0.4.0 // indirect
|
||||||
|
golang.org/x/tools v0.5.0 // indirect
|
||||||
|
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
|
||||||
|
howett.net/plist v1.0.0 // indirect
|
||||||
)
|
)
|
||||||
|
|
|
@ -0,0 +1,51 @@
|
||||||
|
package postgresql
|
||||||
|
|
||||||
|
import (
|
||||||
|
"database/sql"
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
|
||||||
|
"github.com/golang-migrate/migrate/v4"
|
||||||
|
"github.com/golang-migrate/migrate/v4/database/postgres"
|
||||||
|
_ "github.com/golang-migrate/migrate/v4/source/file"
|
||||||
|
)
|
||||||
|
|
||||||
|
func runMigration(db *sql.DB, server string) {
|
||||||
|
// init migrate data
|
||||||
|
driver, _ := postgres.WithInstance(db, &postgres.Config{})
|
||||||
|
m, err := migrate.NewWithDatabaseInstance(
|
||||||
|
fmt.Sprintf("file:///%s", getMigrationDirectoryPath(server)),
|
||||||
|
"postgres",
|
||||||
|
driver,
|
||||||
|
)
|
||||||
|
|
||||||
|
// up
|
||||||
|
if err == nil {
|
||||||
|
if err = m.Up(); err != nil && err.Error() != "no change" {
|
||||||
|
fmt.Printf("[postgresql] run migration error: %s", err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
fmt.Printf("⚡️[postgres]: done migration \n")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func getMigrationDirectoryPath(server string) string {
|
||||||
|
migrationDir := fmt.Sprintf("/external/postgresql/%s/migrations/sql", server)
|
||||||
|
|
||||||
|
dirname, err := os.Getwd()
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check path existed
|
||||||
|
path := dirname + migrationDir
|
||||||
|
if _, err = os.Stat(path); os.IsNotExist(err) {
|
||||||
|
// Create if not existed
|
||||||
|
err = os.Mkdir(path, 0755)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return path
|
||||||
|
}
|
|
@ -1,60 +1,72 @@
|
||||||
package postgresql
|
package postgresql
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"database/sql"
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
_ "github.com/jackc/pgx/v4/stdlib"
|
||||||
"github.com/jmoiron/sqlx"
|
"github.com/volatiletech/sqlboiler/v4/boil"
|
||||||
_ "github.com/lib/pq" // For postgres dialect
|
"go.elastic.co/apm/module/apmsql/v2"
|
||||||
"github.com/logrusorgru/aurora"
|
_ "go.elastic.co/apm/module/apmsql/v2/pgxv4"
|
||||||
|
|
||||||
"go.elastic.co/apm/module/apmsql"
|
|
||||||
_ "go.elastic.co/apm/module/apmsql/pq" // For apm dialect
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
type Config struct {
|
||||||
sqlxClient *sqlx.DB
|
Host string
|
||||||
)
|
Port int
|
||||||
|
User string
|
||||||
|
Password string
|
||||||
|
DBName string
|
||||||
|
SSLMode string
|
||||||
|
IsDebug bool
|
||||||
|
MaxOpenConnections int
|
||||||
|
MaxIdleConnections int
|
||||||
|
ConnectionLifetime time.Duration
|
||||||
|
UseElasticAPM bool
|
||||||
|
}
|
||||||
|
|
||||||
// Connect to postgresql database
|
// Connect ...
|
||||||
func Connect(host, user, password, dbname, port, sslmode string) error {
|
func Connect(cfg Config, server string) (db *sql.DB, err error) {
|
||||||
// Connect string
|
uri := fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=%s",
|
||||||
dsn := fmt.Sprintf("host=%s user=%s password=%s dbname=%s port=%s sslmode=%s TimeZone=UTC",
|
cfg.Host, cfg.Port, cfg.User, cfg.Password, cfg.DBName, cfg.SSLMode)
|
||||||
host, user, password, dbname, port, sslmode,
|
|
||||||
)
|
|
||||||
|
|
||||||
// TODO: write case for SSL mode
|
// connect
|
||||||
|
if cfg.UseElasticAPM {
|
||||||
// db, err := sqlx.Connect("postgres", dsn)
|
db, err = apmsql.Open("pgx", uri)
|
||||||
// if err != nil {
|
} else {
|
||||||
// log.Fatalln(err)
|
db, err = sql.Open("pgx", uri)
|
||||||
// }
|
}
|
||||||
|
|
||||||
apmDB, err := apmsql.Open("postgres", dsn)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalln(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
db := sqlx.NewDb(apmDB, "postgres")
|
// ping
|
||||||
|
if err = db.Ping(); err != nil {
|
||||||
fmt.Println(aurora.Green("*** CONNECTED TO POSTGRESQL - SQLX: " + dsn))
|
fmt.Printf("[postgresql] pgx ping error: %s", err.Error())
|
||||||
|
return nil, err
|
||||||
// Config connection pool
|
|
||||||
sqlDB := db.DB
|
|
||||||
sqlDB.SetMaxOpenConns(100)
|
|
||||||
sqlDB.SetMaxIdleConns(20)
|
|
||||||
sqlDB.SetConnMaxLifetime(time.Minute * 5)
|
|
||||||
|
|
||||||
// Assign client
|
|
||||||
sqlxClient = db
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetSqlxInstance ...
|
// config
|
||||||
func GetSqlxInstance() *sqlx.DB {
|
if cfg.MaxOpenConnections == 0 {
|
||||||
return sqlxClient
|
cfg.MaxOpenConnections = 25
|
||||||
}
|
}
|
||||||
|
if cfg.MaxIdleConnections == 0 {
|
||||||
|
cfg.MaxIdleConnections = 25
|
||||||
|
}
|
||||||
|
if cfg.ConnectionLifetime == 0 {
|
||||||
|
cfg.ConnectionLifetime = 5 * time.Minute
|
||||||
|
}
|
||||||
|
db.SetMaxOpenConns(cfg.MaxOpenConnections)
|
||||||
|
db.SetMaxIdleConns(cfg.MaxIdleConnections)
|
||||||
|
db.SetConnMaxLifetime(cfg.ConnectionLifetime)
|
||||||
|
|
||||||
|
// run migration
|
||||||
|
runMigration(db, server)
|
||||||
|
|
||||||
|
// debug mode
|
||||||
|
boil.DebugMode = cfg.IsDebug
|
||||||
|
|
||||||
|
fmt.Printf("⚡️[postgres]: connected to %s:%d, with APM: %t \n", cfg.Host, cfg.Port, cfg.UseElasticAPM)
|
||||||
|
|
||||||
|
return db, nil
|
||||||
|
}
|
||||||
|
|
|
@ -0,0 +1,53 @@
|
||||||
|
package postgresql
|
||||||
|
|
||||||
|
import (
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/shopspring/decimal"
|
||||||
|
"github.com/volatiletech/null/v8"
|
||||||
|
)
|
||||||
|
|
||||||
|
func SetString(val string) null.String {
|
||||||
|
return null.String{
|
||||||
|
String: val,
|
||||||
|
Valid: true,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func SetInt(val int) null.Int {
|
||||||
|
return null.Int{
|
||||||
|
Int: val,
|
||||||
|
Valid: true,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func SetBool(val bool) (res null.Bool) {
|
||||||
|
return null.Bool{
|
||||||
|
Bool: val,
|
||||||
|
Valid: true,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func SetTime(val time.Time) (res null.Time) {
|
||||||
|
if val.IsZero() {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
return null.Time{
|
||||||
|
Time: val,
|
||||||
|
Valid: true,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func SetJSON(val []byte) (res null.JSON) {
|
||||||
|
if val == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
return null.JSON{
|
||||||
|
JSON: val,
|
||||||
|
Valid: true,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func SetFloat64(val float64) decimal.Decimal {
|
||||||
|
return decimal.NewFromFloat(val)
|
||||||
|
}
|
|
@ -0,0 +1,38 @@
|
||||||
|
package postgresql
|
||||||
|
|
||||||
|
import (
|
||||||
|
"golang.org/x/text/runes"
|
||||||
|
"golang.org/x/text/transform"
|
||||||
|
"golang.org/x/text/unicode/norm"
|
||||||
|
"regexp"
|
||||||
|
"strings"
|
||||||
|
"unicode"
|
||||||
|
)
|
||||||
|
|
||||||
|
// RemoveDiacritics ...
|
||||||
|
func RemoveDiacritics(s string) string {
|
||||||
|
if s != "" {
|
||||||
|
s = strings.ToLower(s)
|
||||||
|
s = replaceStringWithRegex(s, `đ`, "d")
|
||||||
|
t := transform.Chain(norm.NFD, runes.Remove(runes.In(unicode.Mn)), norm.NFC)
|
||||||
|
result, _, _ := transform.String(t, s)
|
||||||
|
result = replaceStringWithRegex(result, `[^a-zA-Z0-9\s]`, "")
|
||||||
|
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
|
// replaceStringWithRegex ...
|
||||||
|
func replaceStringWithRegex(src string, regex string, replaceText string) string {
|
||||||
|
reg := regexp.MustCompile(regex)
|
||||||
|
return reg.ReplaceAllString(src, replaceText)
|
||||||
|
}
|
||||||
|
|
||||||
|
// TransformKeywordToSearchString ...
|
||||||
|
func TransformKeywordToSearchString(keyword string) string {
|
||||||
|
s := strings.Trim(keyword, " ")
|
||||||
|
s = RemoveDiacritics(s)
|
||||||
|
s = strings.ReplaceAll(s, " ", "&")
|
||||||
|
return s + ":*" // For prefix search
|
||||||
|
}
|
Loading…
Reference in New Issue