tempestas

A REST API for processing sensor.community data
git clone https://git.bracken.jp/tempestas.git
Log | Files | Refs | README | LICENSE

storage.go (3433B)


      1 package storage
      2 
      3 import (
      4 	"context"
      5 	"database/sql"
      6 	"fmt"
      7 	"time"
      8 )
      9 
     10 const (
     11 	INSERT_READING = "CALL save_reading($1, $2, $3, $4, $5)"
     12 	QUERY_SENSORS  = "SELECT sensor_id, name FROM sensors ORDER BY sensor_id"
     13 	QUERY_READINGS = "SELECT " +
     14 		"  sensor_id, sw_version, reading_time, reading_type, reading_value " +
     15 		"FROM " +
     16 		"  sensor_data " +
     17 		"WHERE " +
     18 		"  sensor_id = $1 AND reading_type = $2 AND reading_time >= $3 AND reading_time < $4 " +
     19 		"ORDER BY " +
     20 		"  reading_time"
     21 )
     22 
     23 type Reading struct {
     24 	SensorId        string    `json:"sensor_id"`
     25 	SoftwareVersion string    `json:"sw_version"`
     26 	Time            time.Time `json:"time",string`
     27 	Type            string    `json:"type"`
     28 	Value           float64   `json:"value"`
     29 }
     30 
     31 type Sensor struct {
     32 	Id   string `json:"id"`
     33 	Name string `json:"name"`
     34 }
     35 
     36 type DataStore interface {
     37 	Close()
     38 	StoreReading(r *Reading) error
     39 	QueryReadings(sensorId string, readingType string, start time.Time, end time.Time) ([]Reading, error)
     40 	QuerySensors() ([]Sensor, error)
     41 }
     42 
     43 type PostgresDataStore struct {
     44 	db *sql.DB
     45 }
     46 
     47 func ConnectPostgres(dbname string, user string, password string) *PostgresDataStore {
     48 	var s = new(PostgresDataStore)
     49 	var err error
     50 
     51 	// Create the database object. A nil error does not guarantee the DB is connected.
     52 	dbinfo := fmt.Sprintf("user=%s password=%s dbname=%s sslmode=disable", user, password, dbname)
     53 	if s.db, err = sql.Open("postgres", dbinfo); err != nil {
     54 		panic("Error connecting to database")
     55 	}
     56 
     57 	// Ping the DB to verify we can connect.
     58 	ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
     59 	defer cancel()
     60 	if err := s.db.PingContext(ctx); err != nil {
     61 		panic("Error connecting to database")
     62 	}
     63 
     64 	return s
     65 }
     66 
     67 func (s *PostgresDataStore) Close() {
     68 	s.db.Close()
     69 }
     70 
     71 func (s *PostgresDataStore) StoreReading(r *Reading) error {
     72 	_, err := s.db.Exec(INSERT_READING, r.SensorId, r.SoftwareVersion, r.Time, r.Type, r.Value)
     73 	return err
     74 }
     75 
     76 func (s *PostgresDataStore) QueryReadings(sensorId string,
     77 	readingType string,
     78 	start time.Time,
     79 	end time.Time) ([]Reading, error) {
     80 
     81 	rows, err := s.db.Query(QUERY_READINGS, sensorId, readingType, start, end)
     82 	if err != nil {
     83 		return nil, err
     84 	}
     85 	defer rows.Close()
     86 
     87 	var readings []Reading
     88 	for rows.Next() {
     89 		var r Reading
     90 		if err := rows.Scan(&r.SensorId, &r.SoftwareVersion, &r.Time, &r.Type, &r.Value); err != nil {
     91 			return readings, err
     92 		}
     93 		readings = append(readings, r)
     94 	}
     95 	if err = rows.Err(); err != nil {
     96 		return readings, err
     97 	}
     98 	return readings, nil
     99 }
    100 
    101 func (s *PostgresDataStore) QuerySensors() ([]Sensor, error) {
    102 	rows, err := s.db.Query(QUERY_SENSORS)
    103 	if err != nil {
    104 		return nil, err
    105 	}
    106 	defer rows.Close()
    107 
    108 	var sensors []Sensor
    109 	for rows.Next() {
    110 		var s Sensor
    111 		if err := rows.Scan(&s.Id, &s.Name); err != nil {
    112 			return sensors, err
    113 		}
    114 		sensors = append(sensors, s)
    115 	}
    116 	if err = rows.Err(); err != nil {
    117 		return sensors, err
    118 	}
    119 	return sensors, nil
    120 }
    121 
    122 type NullDataStore struct {
    123 }
    124 
    125 func ConnectNull() *NullDataStore {
    126 	return new(NullDataStore)
    127 }
    128 
    129 func (s *NullDataStore) Close() {
    130 }
    131 
    132 func (s *NullDataStore) StoreReading(r *Reading) error {
    133 	return nil
    134 }
    135 
    136 func (s *NullDataStore) QueryReadings(
    137 	sensorId string,
    138 	readingType string,
    139 	start time.Time,
    140 	end time.Time) ([]Reading, error) {
    141 
    142 	var readings []Reading
    143 	return readings, nil
    144 }
    145 
    146 func (s *NullDataStore) QuerySensors() ([]Sensor, error) {
    147 	var sensors []Sensor
    148 	return sensors, nil
    149 }