File: dataflow_top_stock_calc.md

package info (click to toggle)
ruby-concurrent 1.3.5-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 4,136 kB
  • sloc: ruby: 30,875; java: 6,128; ansic: 265; makefile: 26; sh: 19
file content (109 lines) | stat: -rw-r--r-- 3,261 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
This program determines which stock had the highest price in a given year.
It as an example from chapter 1 "Introduction", section 1.2 "What's Scala?" of the book
[Programming Scala: Tackle Multi-Core Complexity on the Java Virtual Machine](http://pragprog.com/book/vsscala/programming-scala).

## What It Does

This program takes a list of one or more stock symbols and a year. It then concurrently
obtains the relevant stock data from Alpha Vantage service for each symbol. Once all
the data has been retrieved the program determines which stock had the highest year-end
closing price.

To use this example you need to obtain a free api key in [AlphaVantage](https://www.alphavantage.co/support/#api-key).


#### The Ruby Code

```ruby
require 'concurrent'
require 'csv'
require 'open-uri'

def get_year_end_closing(symbol, year, api_key)
  uri = "https://www.alphavantage.co/query?function=TIME_SERIES_MONTHLY&symbol=#{symbol}&apikey=#{api_key}&datatype=csv"
  data = []
  csv = URI.parse(uri).read
  if csv.include?('call frequency')
    return :rate_limit_exceeded
  end
  CSV.parse(csv, headers: true) do |row|
    data << row['close'].to_f if row['timestamp'].include?(year.to_s)
  end
  price = data.max
  [symbol, price]
end

def get_top_stock(symbols, year, timeout = 10)
  api_key = ENV['ALPHAVANTAGE_KEY']
  abort(error_message) unless api_key

  stock_prices = symbols.collect{|symbol| Concurrent::dataflow{ get_year_end_closing(symbol, year, api_key) }}
  Concurrent::dataflow(*stock_prices) { |*prices|
    next :rate_limit_exceeded if prices.include?(:rate_limit_exceeded)
    prices.reduce(['', 0.0]){|highest, price| price.last > highest.last ? price : highest}
  }.value(timeout)
end

def error_message
  <<~EOF
    PLEASE provide a Alpha Vantage api key for the example to work
    usage:
      ALPHAVANTAGE_KEY=YOUR_API_KEY bundle exec ruby top-stock-scala/top-stock.rb
  EOF
end

symbols = ['AAPL', 'GOOG', 'IBM', 'ORCL', 'MSFT']
year = 2018

result = get_top_stock(symbols, year)

if result == :rate_limit_exceeded
  puts "API rate limit exceeded"
else
  top_stock, highest_price = result
  puts "Top stock of #{year} is #{top_stock} closing at price $#{highest_price}"
end
```

#### The Scala Code

```scala
//START:PART1
import scala.actors._
import Actor._

val symbols = List( "AAPL", "GOOG", "IBM", "JAVA", "MSFT")
val receiver = self
val year = 2008

symbols.foreach { symbol =>
  actor { receiver ! getYearEndClosing(symbol, year) }
}

val (topStock, highestPrice) = getTopStock(symbols.length)

printf("Top stock of %d is %s closing at price %f\n", year, topStock, highestPrice)
//END:PART1

//START:PART2
def getYearEndClosing(symbol : String, year : Int) = {
  val url = "http://ichart.finance.yahoo.com/table.csv?s=" +
    symbol + "&a=11&b=01&c=" + year + "&d=11&e=31&f=" + year + "&g=m"

  val data = io.Source.fromURL(url).mkString
  val price = data.split("\n")(1).split(",")(4).toDouble
  (symbol, price)
}
//END:PART2

//START:PART3
def getTopStock(count : Int) : (String, Double) = {
  (1 to count).foldLeft("", 0.0) { (previousHigh, index) =>
    receiveWithin(10000) {
      case (symbol : String, price : Double) =>
        if (price > previousHigh._2) (symbol, price) else previousHigh
    }
  }
}
//START:PART3
```