Skip to content

Commit

Permalink
moved faraday middleware into the zipkin client gem
Browse files Browse the repository at this point in the history
  • Loading branch information
JordiPolo committed Sep 6, 2015
1 parent 4331a8c commit 4f9d1c3
Show file tree
Hide file tree
Showing 11 changed files with 352 additions and 116 deletions.
2 changes: 2 additions & 0 deletions .rspec
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
--color
--require spec_helper
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# 0.5.0
Added Faraday middleware to the repo

# 0.4.0
Use Thread safe Finagle version to store the traces
32 changes: 30 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
# ZipkinTracer

Rack integration middleware for Zipkin tracing.
[![Build Status](https://api.travis-ci.org/openzipkin/zipkin-tracer.svg?branch=master)](https://travis-ci.org/openzipkin/zipkin-tracer)

Rack and Faraday integration middlewares for Zipkin tracing.

## Usage

### Sending traces on incoming requests

Options can be provided via Rails.config for a Rails 3+ app, or can be passed
as a hash argument to the Rack plugin.

Expand All @@ -24,14 +28,38 @@ where Rails.config.zipkin_tracer or config is a hash that can contain the follow
* `:whitelist_plugin` - plugin function which recieves Rack env and will force sampling if it returns true
* `:zookeeper` - plugin function which uses zookeeper and kafka instead of scribe as the transport

## Warning
### Warning

NOTE that access to the response body (available in the annotate
plugin) may cause problems in the case that a response is being
streamed; in general, this should be avoided (see the Rack
specification for more detail and instructions for properly hijacking
responses).


### Sending traces on outgoing requests with Faraday

First, for the Faraday middleware to have the correct trace ID, you should be using the rack middleware in your application.

Then include ZipkinTracer::FaradayHandler as a Faraday middleware:

require 'faraday'
require 'zipkin-tracer'

conn = Faraday.new(:url => 'http://localhost:9292/') do |faraday|
# 'service_name' is optional (but recommended)
faraday.use ZipkinTracer::FaradayHandler, 'service_name'
# default Faraday stack
faraday.request :url_encoded
faraday.adapter Faraday.default_adapter
end

Note that supplying the service name for the destination service is
optional; the tracing will default to a service name derived from the
first section of the destination URL (e.g. 'service.example.com' =>
'service').


## Plugins

### annotate_plugin
Expand Down
115 changes: 2 additions & 113 deletions lib/zipkin-tracer.rb
Original file line number Diff line number Diff line change
@@ -1,113 +1,2 @@
# Copyright 2012 Twitter Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
require 'finagle-thrift'
require 'finagle-thrift/trace'
require 'scribe'

require 'zipkin-tracer/careless_scribe'

if RUBY_PLATFORM == 'java'
require 'hermann/producer'
require 'zipkin-tracer/zipkin_kafka_tracer'
end

module ZipkinTracer extend self

class RackHandler
B3_REQUIRED_HEADERS = %w[HTTP_X_B3_TRACEID HTTP_X_B3_PARENTSPANID HTTP_X_B3_SPANID HTTP_X_B3_SAMPLED]
B3_OPT_HEADERS = %w[HTTP_X_B3_FLAGS]

def initialize(app, config=nil)
@app = app
@lock = Mutex.new

config ||= app.config.zipkin_tracer # if not specified, try on app (e.g. Rails 3+)
@service_name = config[:service_name]
@service_port = config[:service_port]

::Trace.tracer = if config[:scribe_server] && defined?(::Scribe)
scribe = config[:scribe_server] ? Scribe.new(config[:scribe_server]) : Scribe.new()
scribe_max_buffer = config[:scribe_max_buffer] ? config[:scribe_max_buffer] : 10
::Trace::ZipkinTracer.new(CarelessScribe.new(scribe), scribe_max_buffer)
elsif config[:zookeeper] && RUBY_PLATFORM == 'java' && defined?(::Hermann)
kafkaTracer = ::Trace::ZipkinKafkaTracer.new
kafkaTracer.connect(config[:zookeeper])
kafkaTracer
end

@sample_rate = config[:sample_rate] ? config[:sample_rate] : 0.1
@annotate_plugin = config[:annotate_plugin] # call for trace annotation
@filter_plugin = config[:filter_plugin] # skip tracing if returns false
@whitelist_plugin = config[:whitelist_plugin] # force sampling if returns true
end

def call(env)
# skip certain requests
return @app.call(env) if filtered?(env)

::Trace.default_endpoint = ::Trace.default_endpoint.with_service_name(@service_name).with_port(@service_port)
::Trace.sample_rate=(@sample_rate)
whitelisted = force_sample?(env)
id = get_or_create_trace_id(env, whitelisted) # note that this depends on the sample rate being set
tracing_filter(id, env, whitelisted) { @app.call(env) }
end

private
def annotate(env, status, response_headers, response_body)
@annotate_plugin.call(env, status, response_headers, response_body) if @annotate_plugin
end

def filtered?(env)
@filter_plugin && !@filter_plugin.call(env)
end

def force_sample?(env)
@whitelist_plugin && @whitelist_plugin.call(env)
end

def tracing_filter(trace_id, env, whitelisted=false)
@lock.synchronize do
::Trace.push(trace_id)
::Trace.set_rpc_name(env["REQUEST_METHOD"]) # get/post and all that jazz
::Trace.record(::Trace::BinaryAnnotation.new("http.uri", env["PATH_INFO"], "STRING", ::Trace.default_endpoint))
::Trace.record(::Trace::Annotation.new(::Trace::Annotation::SERVER_RECV, ::Trace.default_endpoint))
::Trace.record(::Trace::Annotation.new('whitelisted', ::Trace.default_endpoint)) if whitelisted
end
status, headers, body = yield if block_given?
ensure
@lock.synchronize do
::Trace.record(::Trace::Annotation.new(::Trace::Annotation::SERVER_SEND, ::Trace.default_endpoint))
annotate(env, status, headers, body)
::Trace.pop
end
end

private
def get_or_create_trace_id(env, whitelisted, default_flags = ::Trace::Flags::EMPTY)
trace_parameters = if B3_REQUIRED_HEADERS.all? { |key| env.has_key?(key) }
env.values_at(*B3_REQUIRED_HEADERS)
else
new_id = Trace.generate_id
[new_id, nil, new_id, ("true" if whitelisted || Trace.should_sample?)]
end
trace_parameters[3] = (trace_parameters[3] == "true")

trace_parameters += env.values_at(*B3_OPT_HEADERS) # always check flags
trace_parameters[4] = (trace_parameters[4] || default_flags).to_i

Trace::TraceId.new(*trace_parameters)
end
end

end
require 'zipkin-tracer/rack/zipkin-tracer'
require 'zipkin-tracer/faraday/zipkin-tracer'
56 changes: 56 additions & 0 deletions lib/zipkin-tracer/faraday/zipkin-tracer.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
require 'faraday'
require 'finagle-thrift'
require 'finagle-thrift/trace'
require 'uri'

module ZipkinTracer
class FaradayHandler < ::Faraday::Middleware
B3_HEADERS = {
:trace_id => "X-B3-TraceId",
:parent_id => "X-B3-ParentSpanId",
:span_id => "X-B3-SpanId",
:sampled => "X-B3-Sampled",
:flags => "X-B3-Flags"
}.freeze

def initialize(app, service_name=nil)
@app = app
@service_name = service_name
end

def call(env)
# handle either a URI object (passed by Faraday v0.8.x in testing), or something string-izable
url = env[:url].respond_to?(:host) ? env[:url] : URI.parse(env[:url].to_s)
service_name = @service_name || url.host.split('.').first # default to url-derived service name
endpoint = ::Trace::Endpoint.new(host_ip_for(url.host), url.port, service_name)

::Trace.unwind do
trace_id = ::Trace.id
::Trace.push(trace_id.next_id)
B3_HEADERS.each do |method, header|
env[:request_headers][header] = ::Trace.id.send(method).to_s
end

# annotate with method (GET/POST/etc.) and uri path
::Trace.set_rpc_name(env[:method].to_s.upcase)
::Trace.record(::Trace::BinaryAnnotation.new("http.uri", url.path, "STRING", endpoint))
::Trace.record(::Trace::Annotation.new(::Trace::Annotation::CLIENT_SEND, endpoint))
result = @app.call(env).on_complete do |renv|
# record HTTP status code on response
::Trace.record(::Trace::BinaryAnnotation.new("http.status", [renv[:status]].pack('n'), "I16", endpoint))
end
::Trace.record(::Trace::Annotation.new(::Trace::Annotation::CLIENT_RECV, endpoint))
result
end
end

# get host IP for specified hostname, catching exceptions
def host_ip_for(hostname)
::Trace::Endpoint.host_to_i32(hostname)
rescue
# default to 0.0.0.0 if lookup fails
0x00000000
end
private :host_ip_for
end
end
113 changes: 113 additions & 0 deletions lib/zipkin-tracer/rack/zipkin-tracer.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
# Copyright 2012 Twitter Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
require 'finagle-thrift'
require 'finagle-thrift/trace'
require 'scribe'

require 'zipkin-tracer/careless_scribe'

if RUBY_PLATFORM == 'java'
require 'hermann/producer'
require 'zipkin-tracer/zipkin_kafka_tracer'
end

module ZipkinTracer extend self

class RackHandler
B3_REQUIRED_HEADERS = %w[HTTP_X_B3_TRACEID HTTP_X_B3_PARENTSPANID HTTP_X_B3_SPANID HTTP_X_B3_SAMPLED]
B3_OPT_HEADERS = %w[HTTP_X_B3_FLAGS]

def initialize(app, config=nil)
@app = app
@lock = Mutex.new

config ||= app.config.zipkin_tracer # if not specified, try on app (e.g. Rails 3+)
@service_name = config[:service_name]
@service_port = config[:service_port]

::Trace.tracer = if config[:scribe_server] && defined?(::Scribe)
scribe = config[:scribe_server] ? Scribe.new(config[:scribe_server]) : Scribe.new()
scribe_max_buffer = config[:scribe_max_buffer] ? config[:scribe_max_buffer] : 10
::Trace::ZipkinTracer.new(CarelessScribe.new(scribe), scribe_max_buffer)
elsif config[:zookeeper] && RUBY_PLATFORM == 'java' && defined?(::Hermann)
kafkaTracer = ::Trace::ZipkinKafkaTracer.new
kafkaTracer.connect(config[:zookeeper])
kafkaTracer
end

@sample_rate = config[:sample_rate] ? config[:sample_rate] : 0.1
@annotate_plugin = config[:annotate_plugin] # call for trace annotation
@filter_plugin = config[:filter_plugin] # skip tracing if returns false
@whitelist_plugin = config[:whitelist_plugin] # force sampling if returns true
end

def call(env)
# skip certain requests
return @app.call(env) if filtered?(env)

::Trace.default_endpoint = ::Trace.default_endpoint.with_service_name(@service_name).with_port(@service_port)
::Trace.sample_rate=(@sample_rate)
whitelisted = force_sample?(env)
id = get_or_create_trace_id(env, whitelisted) # note that this depends on the sample rate being set
tracing_filter(id, env, whitelisted) { @app.call(env) }
end

private
def annotate(env, status, response_headers, response_body)
@annotate_plugin.call(env, status, response_headers, response_body) if @annotate_plugin
end

def filtered?(env)
@filter_plugin && !@filter_plugin.call(env)
end

def force_sample?(env)
@whitelist_plugin && @whitelist_plugin.call(env)
end

def tracing_filter(trace_id, env, whitelisted=false)
@lock.synchronize do
::Trace.push(trace_id)
::Trace.set_rpc_name(env["REQUEST_METHOD"]) # get/post and all that jazz
::Trace.record(::Trace::BinaryAnnotation.new("http.uri", env["PATH_INFO"], "STRING", ::Trace.default_endpoint))
::Trace.record(::Trace::Annotation.new(::Trace::Annotation::SERVER_RECV, ::Trace.default_endpoint))
::Trace.record(::Trace::Annotation.new('whitelisted', ::Trace.default_endpoint)) if whitelisted
end
status, headers, body = yield if block_given?
ensure
@lock.synchronize do
::Trace.record(::Trace::Annotation.new(::Trace::Annotation::SERVER_SEND, ::Trace.default_endpoint))
annotate(env, status, headers, body)
::Trace.pop
end
end

private
def get_or_create_trace_id(env, whitelisted, default_flags = ::Trace::Flags::EMPTY)
trace_parameters = if B3_REQUIRED_HEADERS.all? { |key| env.has_key?(key) }
env.values_at(*B3_REQUIRED_HEADERS)
else
new_id = Trace.generate_id
[new_id, nil, new_id, ("true" if whitelisted || Trace.should_sample?)]
end
trace_parameters[3] = (trace_parameters[3] == "true")

trace_parameters += env.values_at(*B3_OPT_HEADERS) # always check flags
trace_parameters[4] = (trace_parameters[4] || default_flags).to_i

Trace::TraceId.new(*trace_parameters)
end
end

end
2 changes: 1 addition & 1 deletion lib/zipkin-tracer/version.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
module ZipkinTracer
VERSION = "0.4.0"
VERSION = "0.5.0"
end

Loading

0 comments on commit 4f9d1c3

Please sign in to comment.