storage.go (3433B)
1 package storage 2 3 import ( 4 "context" 5 "database/sql" 6 "fmt" 7 "time" 8 ) 9 10 const ( 11 INSERT_READING = "CALL save_reading($1, $2, $3, $4, $5)" 12 QUERY_SENSORS = "SELECT sensor_id, name FROM sensors ORDER BY sensor_id" 13 QUERY_READINGS = "SELECT " + 14 " sensor_id, sw_version, reading_time, reading_type, reading_value " + 15 "FROM " + 16 " sensor_data " + 17 "WHERE " + 18 " sensor_id = $1 AND reading_type = $2 AND reading_time >= $3 AND reading_time < $4 " + 19 "ORDER BY " + 20 " reading_time" 21 ) 22 23 type Reading struct { 24 SensorId string `json:"sensor_id"` 25 SoftwareVersion string `json:"sw_version"` 26 Time time.Time `json:"time",string` 27 Type string `json:"type"` 28 Value float64 `json:"value"` 29 } 30 31 type Sensor struct { 32 Id string `json:"id"` 33 Name string `json:"name"` 34 } 35 36 type DataStore interface { 37 Close() 38 StoreReading(r *Reading) error 39 QueryReadings(sensorId string, readingType string, start time.Time, end time.Time) ([]Reading, error) 40 QuerySensors() ([]Sensor, error) 41 } 42 43 type PostgresDataStore struct { 44 db *sql.DB 45 } 46 47 func ConnectPostgres(dbname string, user string, password string) *PostgresDataStore { 48 var s = new(PostgresDataStore) 49 var err error 50 51 // Create the database object. A nil error does not guarantee the DB is connected. 52 dbinfo := fmt.Sprintf("user=%s password=%s dbname=%s sslmode=disable", user, password, dbname) 53 if s.db, err = sql.Open("postgres", dbinfo); err != nil { 54 panic("Error connecting to database") 55 } 56 57 // Ping the DB to verify we can connect. 58 ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) 59 defer cancel() 60 if err := s.db.PingContext(ctx); err != nil { 61 panic("Error connecting to database") 62 } 63 64 return s 65 } 66 67 func (s *PostgresDataStore) Close() { 68 s.db.Close() 69 } 70 71 func (s *PostgresDataStore) StoreReading(r *Reading) error { 72 _, err := s.db.Exec(INSERT_READING, r.SensorId, r.SoftwareVersion, r.Time, r.Type, r.Value) 73 return err 74 } 75 76 func (s *PostgresDataStore) QueryReadings(sensorId string, 77 readingType string, 78 start time.Time, 79 end time.Time) ([]Reading, error) { 80 81 rows, err := s.db.Query(QUERY_READINGS, sensorId, readingType, start, end) 82 if err != nil { 83 return nil, err 84 } 85 defer rows.Close() 86 87 var readings []Reading 88 for rows.Next() { 89 var r Reading 90 if err := rows.Scan(&r.SensorId, &r.SoftwareVersion, &r.Time, &r.Type, &r.Value); err != nil { 91 return readings, err 92 } 93 readings = append(readings, r) 94 } 95 if err = rows.Err(); err != nil { 96 return readings, err 97 } 98 return readings, nil 99 } 100 101 func (s *PostgresDataStore) QuerySensors() ([]Sensor, error) { 102 rows, err := s.db.Query(QUERY_SENSORS) 103 if err != nil { 104 return nil, err 105 } 106 defer rows.Close() 107 108 var sensors []Sensor 109 for rows.Next() { 110 var s Sensor 111 if err := rows.Scan(&s.Id, &s.Name); err != nil { 112 return sensors, err 113 } 114 sensors = append(sensors, s) 115 } 116 if err = rows.Err(); err != nil { 117 return sensors, err 118 } 119 return sensors, nil 120 } 121 122 type NullDataStore struct { 123 } 124 125 func ConnectNull() *NullDataStore { 126 return new(NullDataStore) 127 } 128 129 func (s *NullDataStore) Close() { 130 } 131 132 func (s *NullDataStore) StoreReading(r *Reading) error { 133 return nil 134 } 135 136 func (s *NullDataStore) QueryReadings( 137 sensorId string, 138 readingType string, 139 start time.Time, 140 end time.Time) ([]Reading, error) { 141 142 var readings []Reading 143 return readings, nil 144 } 145 146 func (s *NullDataStore) QuerySensors() ([]Sensor, error) { 147 var sensors []Sensor 148 return sensors, nil 149 }