Compare commits
1 Commits
Author | SHA1 | Date |
---|---|---|
Nam Huynh | 486ea35f79 |
128
bulk_write.go
128
bulk_write.go
|
@ -1,128 +0,0 @@
|
|||
package postgresql
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"strings"
|
||||
|
||||
"github.com/volatiletech/sqlboiler/v4/boil"
|
||||
)
|
||||
|
||||
const (
|
||||
indexSign = "$%d"
|
||||
decodeTag = "boil"
|
||||
)
|
||||
|
||||
type bulkInsertItem map[string]interface{}
|
||||
|
||||
type BulkInsertPayload struct {
|
||||
TableName string
|
||||
Data interface{}
|
||||
Columns []string
|
||||
}
|
||||
|
||||
func BulkInsert(ctx context.Context, db boil.ContextExecutor, payload BulkInsertPayload) error {
|
||||
// convert data to map
|
||||
insertRows, err := toMap(payload.Data)
|
||||
if err != nil {
|
||||
fmt.Printf("[postgresql] error when convert data to map: %s", err.Error())
|
||||
return err
|
||||
}
|
||||
|
||||
var (
|
||||
numOfColumns = len(payload.Columns)
|
||||
incSigns = make([]string, 0)
|
||||
insertValues = make([]interface{}, 0)
|
||||
listOfSigns = make([]string, numOfColumns)
|
||||
listOfColumns = make([]string, numOfColumns)
|
||||
)
|
||||
|
||||
// prepare array of dollar signs
|
||||
for i := range listOfSigns {
|
||||
listOfSigns[i] = indexSign
|
||||
listOfColumns[i] = payload.Columns[i]
|
||||
}
|
||||
|
||||
// prepare sign for every column
|
||||
signs := strings.Join(listOfSigns, ",")
|
||||
insertColumns := strings.Join(listOfColumns, ",")
|
||||
|
||||
for index, r := range insertRows {
|
||||
currentIncSigns := getIncSignValues(index, numOfColumns)
|
||||
incSigns = append(incSigns, fmt.Sprintf("("+signs+")",
|
||||
currentIncSigns...,
|
||||
))
|
||||
|
||||
currentInsertValues := getInsertValues(r, payload.Columns, numOfColumns)
|
||||
insertValues = append(insertValues, currentInsertValues...)
|
||||
}
|
||||
|
||||
// exec
|
||||
stm := getSQLStatement(payload.TableName, insertColumns, incSigns)
|
||||
_, err = db.ExecContext(ctx, stm, insertValues...)
|
||||
if err != nil {
|
||||
fmt.Printf("[postgresql] error when insert to db: %s", err.Error())
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func getSQLStatement(tableName, insertColumns string, incSigns []string) string {
|
||||
return fmt.Sprintf(`
|
||||
insert into %s (
|
||||
%s
|
||||
)
|
||||
values %s
|
||||
`, tableName, insertColumns, strings.Join(incSigns, ","))
|
||||
}
|
||||
|
||||
func getIncSignValues(index, numOfColumns int) (result []interface{}) {
|
||||
for i := 1; i <= numOfColumns; i++ {
|
||||
result = append(result, index*numOfColumns+i)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func getInsertValues(row bulkInsertItem, columns []string, numOfColumns int) (result []interface{}) {
|
||||
for i := 0; i < numOfColumns; i++ {
|
||||
result = append(result, row[columns[i]])
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func toMap(input interface{}) ([]bulkInsertItem, error) {
|
||||
result := make([]bulkInsertItem, 0)
|
||||
|
||||
v := reflect.ValueOf(input)
|
||||
if v.Kind() == reflect.Ptr {
|
||||
v = v.Elem()
|
||||
}
|
||||
|
||||
// only accept slices
|
||||
if v.Kind() != reflect.Slice {
|
||||
err := fmt.Errorf("toMap only accepts slices; got %T", v)
|
||||
fmt.Printf("[postgresql] invalid type toMap: %s", err.Error())
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// loop and assign data to result
|
||||
for x := 0; x < v.Len(); x++ {
|
||||
item := bulkInsertItem{}
|
||||
typ := v.Index(x).Type()
|
||||
|
||||
// loop each field
|
||||
for i := 0; i < v.Index(x).NumField(); i++ {
|
||||
fi := typ.Field(i)
|
||||
t := fi.Tag.Get(decodeTag)
|
||||
if t != "" {
|
||||
// set key of map to value in struct field
|
||||
item[t] = v.Index(x).Field(i).Interface()
|
||||
}
|
||||
}
|
||||
result = append(result, item)
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
48
get.go
48
get.go
|
@ -1,48 +0,0 @@
|
|||
package postgresql
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/shopspring/decimal"
|
||||
"github.com/volatiletech/null/v8"
|
||||
)
|
||||
|
||||
func GetString(val null.String) string {
|
||||
if !val.Valid {
|
||||
return ""
|
||||
}
|
||||
return val.String
|
||||
}
|
||||
|
||||
func GetInt(val null.Int) int {
|
||||
if !val.Valid {
|
||||
return 0
|
||||
}
|
||||
return val.Int
|
||||
}
|
||||
|
||||
func GetBool(val null.Bool) bool {
|
||||
if !val.Valid {
|
||||
return false
|
||||
}
|
||||
return val.Bool
|
||||
}
|
||||
|
||||
func GetTime(val null.Time) (res time.Time) {
|
||||
if !val.Valid {
|
||||
return
|
||||
}
|
||||
return val.Time
|
||||
}
|
||||
|
||||
func GetJSON(val null.JSON) (res []byte) {
|
||||
if !val.Valid {
|
||||
return
|
||||
}
|
||||
return val.JSON
|
||||
}
|
||||
|
||||
// GetFloat64 ...
|
||||
func GetFloat64(val decimal.Decimal) float64 {
|
||||
return val.InexactFloat64()
|
||||
}
|
53
go.mod
53
go.mod
|
@ -1,52 +1,11 @@
|
|||
module git.selly.red/Selly-Modules/postgresql
|
||||
module github.com/Selly-Modules/postgresql
|
||||
|
||||
go 1.19
|
||||
go 1.16
|
||||
|
||||
require (
|
||||
github.com/Masterminds/squirrel v1.5.0
|
||||
github.com/golang-migrate/migrate/v4 v4.15.2
|
||||
github.com/jackc/pgx/v4 v4.17.0
|
||||
github.com/shopspring/decimal v1.3.1
|
||||
github.com/volatiletech/null/v8 v8.1.2
|
||||
github.com/volatiletech/sqlboiler/v4 v4.12.0
|
||||
go.elastic.co/apm/module/apmsql/v2 v2.2.0
|
||||
golang.org/x/text v0.6.0
|
||||
)
|
||||
|
||||
require (
|
||||
github.com/armon/go-radix v1.0.0 // indirect
|
||||
github.com/elastic/go-licenser v0.4.1 // indirect
|
||||
github.com/elastic/go-sysinfo v1.9.0 // indirect
|
||||
github.com/elastic/go-windows v1.0.1 // indirect
|
||||
github.com/friendsofgo/errors v0.9.2 // indirect
|
||||
github.com/gofrs/uuid v4.0.0+incompatible // indirect
|
||||
github.com/hashicorp/errwrap v1.1.0 // indirect
|
||||
github.com/hashicorp/go-multierror v1.1.1 // indirect
|
||||
github.com/jackc/chunkreader/v2 v2.0.1 // indirect
|
||||
github.com/jackc/pgconn v1.13.0 // indirect
|
||||
github.com/jackc/pgio v1.0.0 // indirect
|
||||
github.com/jackc/pgpassfile v1.0.0 // indirect
|
||||
github.com/jackc/pgproto3/v2 v2.3.1 // indirect
|
||||
github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b // indirect
|
||||
github.com/jackc/pgtype v1.12.0 // indirect
|
||||
github.com/jcchavezs/porto v0.4.0 // indirect
|
||||
github.com/joeshaw/multierror v0.0.0-20140124173710-69b34d4ec901 // indirect
|
||||
github.com/lann/builder v0.0.0-20180802200727-47ae307949d0 // indirect
|
||||
github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0 // indirect
|
||||
github.com/lib/pq v1.10.2 // indirect
|
||||
github.com/pkg/errors v0.9.1 // indirect
|
||||
github.com/prometheus/procfs v0.9.0 // indirect
|
||||
github.com/santhosh-tekuri/jsonschema v1.2.4 // indirect
|
||||
github.com/volatiletech/inflect v0.0.1 // indirect
|
||||
github.com/volatiletech/randomize v0.0.1 // indirect
|
||||
github.com/volatiletech/strmangle v0.0.4 // indirect
|
||||
go.elastic.co/apm/v2 v2.2.0 // indirect
|
||||
go.elastic.co/fastjson v1.1.0 // indirect
|
||||
go.uber.org/atomic v1.10.0 // indirect
|
||||
golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa // indirect
|
||||
golang.org/x/mod v0.7.0 // indirect
|
||||
golang.org/x/sys v0.4.0 // indirect
|
||||
golang.org/x/tools v0.5.0 // indirect
|
||||
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
|
||||
howett.net/plist v1.0.0 // indirect
|
||||
github.com/jmoiron/sqlx v1.3.4
|
||||
github.com/lib/pq v1.10.2
|
||||
github.com/logrusorgru/aurora v2.0.3+incompatible
|
||||
go.elastic.co/apm/module/apmsql v1.14.0
|
||||
)
|
||||
|
|
51
migration.go
51
migration.go
|
@ -1,51 +0,0 @@
|
|||
package postgresql
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"os"
|
||||
|
||||
"github.com/golang-migrate/migrate/v4"
|
||||
"github.com/golang-migrate/migrate/v4/database/postgres"
|
||||
_ "github.com/golang-migrate/migrate/v4/source/file"
|
||||
)
|
||||
|
||||
func runMigration(db *sql.DB, server string) {
|
||||
// init migrate data
|
||||
driver, _ := postgres.WithInstance(db, &postgres.Config{})
|
||||
m, err := migrate.NewWithDatabaseInstance(
|
||||
fmt.Sprintf("file:///%s", getMigrationDirectoryPath(server)),
|
||||
"postgres",
|
||||
driver,
|
||||
)
|
||||
|
||||
// up
|
||||
if err == nil {
|
||||
if err = m.Up(); err != nil && err.Error() != "no change" {
|
||||
fmt.Printf("[postgresql] run migration error: %s", err.Error())
|
||||
return
|
||||
}
|
||||
fmt.Printf("⚡️[postgres]: done migration \n")
|
||||
}
|
||||
}
|
||||
|
||||
func getMigrationDirectoryPath(server string) string {
|
||||
migrationDir := fmt.Sprintf("/external/postgresql/%s/migrations/sql", server)
|
||||
|
||||
dirname, err := os.Getwd()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
// Check path existed
|
||||
path := dirname + migrationDir
|
||||
if _, err = os.Stat(path); os.IsNotExist(err) {
|
||||
// Create if not existed
|
||||
err = os.Mkdir(path, 0755)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
return path
|
||||
}
|
|
@ -1,72 +1,60 @@
|
|||
package postgresql
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"log"
|
||||
"time"
|
||||
|
||||
_ "github.com/jackc/pgx/v4/stdlib"
|
||||
"github.com/volatiletech/sqlboiler/v4/boil"
|
||||
"go.elastic.co/apm/module/apmsql/v2"
|
||||
_ "go.elastic.co/apm/module/apmsql/v2/pgxv4"
|
||||
|
||||
"github.com/jmoiron/sqlx"
|
||||
_ "github.com/lib/pq" // For postgres dialect
|
||||
"github.com/logrusorgru/aurora"
|
||||
|
||||
"go.elastic.co/apm/module/apmsql"
|
||||
_ "go.elastic.co/apm/module/apmsql/pq" // For apm dialect
|
||||
)
|
||||
|
||||
type Config struct {
|
||||
Host string
|
||||
Port int
|
||||
User string
|
||||
Password string
|
||||
DBName string
|
||||
SSLMode string
|
||||
IsDebug bool
|
||||
MaxOpenConnections int
|
||||
MaxIdleConnections int
|
||||
ConnectionLifetime time.Duration
|
||||
UseElasticAPM bool
|
||||
}
|
||||
var (
|
||||
sqlxClient *sqlx.DB
|
||||
)
|
||||
|
||||
// Connect ...
|
||||
func Connect(cfg Config, server string) (db *sql.DB, err error) {
|
||||
uri := fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=%s",
|
||||
cfg.Host, cfg.Port, cfg.User, cfg.Password, cfg.DBName, cfg.SSLMode)
|
||||
// Connect to postgresql database
|
||||
func Connect(host, user, password, dbname, port, sslmode string) error {
|
||||
// Connect string
|
||||
dsn := fmt.Sprintf("host=%s user=%s password=%s dbname=%s port=%s sslmode=%s TimeZone=UTC",
|
||||
host, user, password, dbname, port, sslmode,
|
||||
)
|
||||
|
||||
// connect
|
||||
if cfg.UseElasticAPM {
|
||||
db, err = apmsql.Open("pgx", uri)
|
||||
} else {
|
||||
db, err = sql.Open("pgx", uri)
|
||||
}
|
||||
// TODO: write case for SSL mode
|
||||
|
||||
// db, err := sqlx.Connect("postgres", dsn)
|
||||
// if err != nil {
|
||||
// log.Fatalln(err)
|
||||
// }
|
||||
|
||||
apmDB, err := apmsql.Open("postgres", dsn)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
log.Fatalln(err)
|
||||
}
|
||||
|
||||
// ping
|
||||
if err = db.Ping(); err != nil {
|
||||
fmt.Printf("[postgresql] pgx ping error: %s", err.Error())
|
||||
return nil, err
|
||||
db := sqlx.NewDb(apmDB, "postgres")
|
||||
|
||||
fmt.Println(aurora.Green("*** CONNECTED TO POSTGRESQL - SQLX: " + dsn))
|
||||
|
||||
// Config connection pool
|
||||
sqlDB := db.DB
|
||||
sqlDB.SetMaxOpenConns(100)
|
||||
sqlDB.SetMaxIdleConns(20)
|
||||
sqlDB.SetConnMaxLifetime(time.Minute * 5)
|
||||
|
||||
// Assign client
|
||||
sqlxClient = db
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// config
|
||||
if cfg.MaxOpenConnections == 0 {
|
||||
cfg.MaxOpenConnections = 25
|
||||
// GetSqlxInstance ...
|
||||
func GetSqlxInstance() *sqlx.DB {
|
||||
return sqlxClient
|
||||
}
|
||||
if cfg.MaxIdleConnections == 0 {
|
||||
cfg.MaxIdleConnections = 25
|
||||
}
|
||||
if cfg.ConnectionLifetime == 0 {
|
||||
cfg.ConnectionLifetime = 5 * time.Minute
|
||||
}
|
||||
db.SetMaxOpenConns(cfg.MaxOpenConnections)
|
||||
db.SetMaxIdleConns(cfg.MaxIdleConnections)
|
||||
db.SetConnMaxLifetime(cfg.ConnectionLifetime)
|
||||
|
||||
// run migration
|
||||
runMigration(db, server)
|
||||
|
||||
// debug mode
|
||||
boil.DebugMode = cfg.IsDebug
|
||||
|
||||
fmt.Printf("⚡️[postgres]: connected to %s:%d, with APM: %t \n", cfg.Host, cfg.Port, cfg.UseElasticAPM)
|
||||
|
||||
return db, nil
|
||||
}
|
||||
|
|
53
set.go
53
set.go
|
@ -1,53 +0,0 @@
|
|||
package postgresql
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/shopspring/decimal"
|
||||
"github.com/volatiletech/null/v8"
|
||||
)
|
||||
|
||||
func SetString(val string) null.String {
|
||||
return null.String{
|
||||
String: val,
|
||||
Valid: true,
|
||||
}
|
||||
}
|
||||
|
||||
func SetInt(val int) null.Int {
|
||||
return null.Int{
|
||||
Int: val,
|
||||
Valid: true,
|
||||
}
|
||||
}
|
||||
|
||||
func SetBool(val bool) (res null.Bool) {
|
||||
return null.Bool{
|
||||
Bool: val,
|
||||
Valid: true,
|
||||
}
|
||||
}
|
||||
|
||||
func SetTime(val time.Time) (res null.Time) {
|
||||
if val.IsZero() {
|
||||
return
|
||||
}
|
||||
return null.Time{
|
||||
Time: val,
|
||||
Valid: true,
|
||||
}
|
||||
}
|
||||
|
||||
func SetJSON(val []byte) (res null.JSON) {
|
||||
if val == nil {
|
||||
return
|
||||
}
|
||||
return null.JSON{
|
||||
JSON: val,
|
||||
Valid: true,
|
||||
}
|
||||
}
|
||||
|
||||
func SetFloat64(val float64) decimal.Decimal {
|
||||
return decimal.NewFromFloat(val)
|
||||
}
|
38
string.go
38
string.go
|
@ -1,38 +0,0 @@
|
|||
package postgresql
|
||||
|
||||
import (
|
||||
"golang.org/x/text/runes"
|
||||
"golang.org/x/text/transform"
|
||||
"golang.org/x/text/unicode/norm"
|
||||
"regexp"
|
||||
"strings"
|
||||
"unicode"
|
||||
)
|
||||
|
||||
// RemoveDiacritics ...
|
||||
func RemoveDiacritics(s string) string {
|
||||
if s != "" {
|
||||
s = strings.ToLower(s)
|
||||
s = replaceStringWithRegex(s, `đ`, "d")
|
||||
t := transform.Chain(norm.NFD, runes.Remove(runes.In(unicode.Mn)), norm.NFC)
|
||||
result, _, _ := transform.String(t, s)
|
||||
result = replaceStringWithRegex(result, `[^a-zA-Z0-9\s]`, "")
|
||||
|
||||
return result
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
// replaceStringWithRegex ...
|
||||
func replaceStringWithRegex(src string, regex string, replaceText string) string {
|
||||
reg := regexp.MustCompile(regex)
|
||||
return reg.ReplaceAllString(src, replaceText)
|
||||
}
|
||||
|
||||
// TransformKeywordToSearchString ...
|
||||
func TransformKeywordToSearchString(keyword string) string {
|
||||
s := strings.Trim(keyword, " ")
|
||||
s = RemoveDiacritics(s)
|
||||
s = strings.ReplaceAll(s, " ", "&")
|
||||
return s + ":*" // For prefix search
|
||||
}
|
Loading…
Reference in New Issue